Import zstd 0.12.3+zstd.1.5.2 upstream upstream/0.12.3
authorDongHun Kwak <dh0128.kwak@samsung.com>
Tue, 4 Apr 2023 00:52:54 +0000 (09:52 +0900)
committerDongHun Kwak <dh0128.kwak@samsung.com>
Tue, 4 Apr 2023 00:52:54 +0000 (09:52 +0900)
38 files changed:
.cargo_vcs_info.json [new file with mode: 0644]
.gitattributes [new file with mode: 0644]
.github/workflows/linux.yml [new file with mode: 0644]
.github/workflows/macos.yml [new file with mode: 0644]
.github/workflows/windows.yml [new file with mode: 0644]
.gitignore [new file with mode: 0644]
.gitmodules [new file with mode: 0644]
.travis.yml [new file with mode: 0644]
Cargo.toml [new file with mode: 0644]
Cargo.toml.orig [new file with mode: 0644]
LICENSE [new file with mode: 0644]
Readme.md [new file with mode: 0644]
appveyor.yml [new file with mode: 0644]
assets/example.txt [new file with mode: 0644]
examples/benchmark.rs [new file with mode: 0644]
examples/stream.rs [new file with mode: 0644]
examples/train.rs [new file with mode: 0644]
examples/zstd.rs [new file with mode: 0644]
examples/zstdcat.rs [new file with mode: 0644]
rustfmt.toml [new file with mode: 0644]
src/bulk/compressor.rs [new file with mode: 0644]
src/bulk/decompressor.rs [new file with mode: 0644]
src/bulk/mod.rs [new file with mode: 0644]
src/bulk/tests.rs [new file with mode: 0644]
src/dict.rs [new file with mode: 0644]
src/lib.rs [new file with mode: 0644]
src/stream/functions.rs [new file with mode: 0644]
src/stream/mod.rs [new file with mode: 0644]
src/stream/raw.rs [new file with mode: 0644]
src/stream/read/mod.rs [new file with mode: 0644]
src/stream/read/tests.rs [new file with mode: 0644]
src/stream/tests.rs [new file with mode: 0644]
src/stream/write/mod.rs [new file with mode: 0644]
src/stream/write/tests.rs [new file with mode: 0644]
src/stream/zio/mod.rs [new file with mode: 0644]
src/stream/zio/reader.rs [new file with mode: 0644]
src/stream/zio/writer.rs [new file with mode: 0644]
tests/issue_182.rs [new file with mode: 0644]

diff --git a/.cargo_vcs_info.json b/.cargo_vcs_info.json
new file mode 100644 (file)
index 0000000..b612ad6
--- /dev/null
@@ -0,0 +1,6 @@
+{
+  "git": {
+    "sha1": "12867809bf3cccf4531c597548ca8bbf14696730"
+  },
+  "path_in_vcs": ""
+}
\ No newline at end of file
diff --git a/.gitattributes b/.gitattributes
new file mode 100644 (file)
index 0000000..51942d1
--- /dev/null
@@ -0,0 +1 @@
+/assets/* -text -crlf
diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml
new file mode 100644 (file)
index 0000000..7932138
--- /dev/null
@@ -0,0 +1,24 @@
+name: Linux
+
+on:
+  push:
+    branches: [ main ]
+  pull_request:
+    branches: [ main ]
+
+env:
+  CARGO_TERM_COLOR: always
+
+jobs:
+  build:
+
+    runs-on: ubuntu-latest
+
+    steps:
+    - uses: actions/checkout@v2
+      with:
+        submodules: recursive
+    - name: Build
+      run: cargo build --verbose
+    - name: Run tests
+      run: cargo test --verbose
diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml
new file mode 100644 (file)
index 0000000..3e2ab2d
--- /dev/null
@@ -0,0 +1,25 @@
+name: macOS
+
+on:
+  push:
+    branches: [ main ]
+  pull_request:
+    branches: [ main ]
+
+env:
+  CARGO_TERM_COLOR: always
+
+jobs:
+  build:
+
+    runs-on: macos-latest
+
+    steps:
+    - uses: actions/checkout@v2
+      with:
+        submodules: recursive
+    - name: Build
+      run: cargo build --verbose
+    - name: Run tests
+      run: cargo test --verbose
+
diff --git a/.github/workflows/windows.yml b/.github/workflows/windows.yml
new file mode 100644 (file)
index 0000000..f775d09
--- /dev/null
@@ -0,0 +1,61 @@
+name: Windows
+
+on:
+  push:
+    branches: [ main ]
+  pull_request:
+    branches: [ main ]
+
+env:
+  CARGO_TERM_COLOR: always
+
+jobs:
+  build:
+
+    strategy:
+      matrix:
+        target:
+          #- i686-pc-windows-gnu
+          - i686-pc-windows-msvc
+          #- x86_64-pc-windows-gnu
+          - x86_64-pc-windows-msvc
+        channel: [ stable ]
+
+    runs-on: windows-latest
+
+    steps:
+    - uses: actions/checkout@v2
+      with:
+        submodules: recursive
+    - name: setup
+      uses: actions-rs/toolchain@v1
+      with:
+        toolchain: ${{ matrix.channel }}-${{ matrix.target }}
+        target: ${{ matrix.target }}
+        override: true
+        profile: minimal
+        default: true
+
+    - name: Add mingw32 to path for i686-gnu
+      run: |
+        echo "C:\msys64\mingw32\bin" >> $GITHUB_PATH
+        echo "C:\msys64\usr\bin" >> $GITHUB_PATH
+      if: matrix.target == 'i686-pc-windows-gnu'
+      shell: bash
+    - name: Add mingw64 to path for x86_64-gnu
+      run: |
+        echo "C:\msys64\mingw64\bin" >> $GITHUB_PATH
+        echo "C:\msys64\usr\bin" >> $GITHUB_PATH
+      if: matrix.target == 'x86_64-pc-windows-gnu'
+      shell: bash
+    - name: Update gcc
+      if: matrix.target == 'x86_64-pc-windows-gnu'
+      run: pacman.exe -Sy --noconfirm mingw-w64-x86_64-toolchain
+    - name: Update gcc
+      if: matrix.target == 'i686-pc-windows-gnu'
+      run: pacman.exe -Sy --noconfirm mingw-w64-i686-toolchain
+
+    - name: Build
+      run: cargo build --verbose --verbose
+    - name: Run tests
+      run: cargo test --verbose
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..80ee6af
--- /dev/null
@@ -0,0 +1,4 @@
+target
+Cargo.lock
+/silesia
+/silesia.zip
diff --git a/.gitmodules b/.gitmodules
new file mode 100644 (file)
index 0000000..6c798c2
--- /dev/null
@@ -0,0 +1,3 @@
+[submodule "zstd-safe/zstd-sys/zstd"]
+       path = zstd-safe/zstd-sys/zstd
+       url = https://github.com/facebook/zstd
diff --git a/.travis.yml b/.travis.yml
new file mode 100644 (file)
index 0000000..aca5d37
--- /dev/null
@@ -0,0 +1,23 @@
+os:
+    - linux
+    - windows
+
+language: rust
+rust:
+    - stable
+    - beta
+    - nightly
+env:
+    matrix:
+    # Combinations of optional features
+    - FEATURES=''
+    - FEATURES='tokio'
+    - FEATURES='bindgen'
+
+matrix:
+  allow_failures:
+    - rust: nightly
+
+script:
+    - cargo build --verbose --features "$FEATURES"
+    - cargo test --verbose --features "$FEATURES"
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644 (file)
index 0000000..c6e47f6
--- /dev/null
@@ -0,0 +1,86 @@
+# THIS FILE IS AUTOMATICALLY GENERATED BY CARGO
+#
+# When uploading crates to the registry Cargo will automatically
+# "normalize" Cargo.toml files for maximal compatibility
+# with all versions of Cargo and also rewrite `path` dependencies
+# to registry (e.g., crates.io) dependencies.
+#
+# If you are reading this file be aware that the original Cargo.toml
+# will likely look very different (and much more reasonable).
+# See Cargo.toml.orig for the original contents.
+
+[package]
+edition = "2018"
+rust-version = "1.43"
+name = "zstd"
+version = "0.12.3+zstd.1.5.2"
+authors = ["Alexandre Bury <alexandre.bury@gmail.com>"]
+exclude = ["assets/*.zst"]
+description = "Binding for the zstd compression library."
+documentation = "https://docs.rs/zstd"
+readme = "Readme.md"
+keywords = [
+    "zstd",
+    "zstandard",
+    "compression",
+]
+categories = [
+    "compression",
+    "api-bindings",
+]
+license = "MIT"
+repository = "https://github.com/gyscos/zstd-rs"
+
+[package.metadata.docs.rs]
+features = [
+    "experimental",
+    "zstdmt",
+    "zdict_builder",
+    "doc-cfg",
+]
+
+[[example]]
+name = "train"
+required-features = ["zdict_builder"]
+
+[dependencies.zstd-safe]
+version = "6.0.3"
+features = ["std"]
+default-features = false
+
+[dev-dependencies.clap]
+version = "4.0"
+features = ["derive"]
+
+[dev-dependencies.humansize]
+version = "2.0"
+
+[dev-dependencies.partial-io]
+version = "0.5"
+
+[dev-dependencies.walkdir]
+version = "2.2"
+
+[features]
+arrays = ["zstd-safe/arrays"]
+bindgen = ["zstd-safe/bindgen"]
+debug = ["zstd-safe/debug"]
+default = [
+    "legacy",
+    "arrays",
+    "zdict_builder",
+]
+doc-cfg = []
+experimental = ["zstd-safe/experimental"]
+fat-lto = ["zstd-safe/fat-lto"]
+legacy = ["zstd-safe/legacy"]
+no_asm = ["zstd-safe/no_asm"]
+pkg-config = ["zstd-safe/pkg-config"]
+thin = ["zstd-safe/thin"]
+thin-lto = ["zstd-safe/thin-lto"]
+wasm = []
+zdict_builder = ["zstd-safe/zdict_builder"]
+zstdmt = ["zstd-safe/zstdmt"]
+
+[badges.travis-ci]
+repository = "gyscos/zstd-rs"
diff --git a/Cargo.toml.orig b/Cargo.toml.orig
new file mode 100644 (file)
index 0000000..0490fbc
--- /dev/null
@@ -0,0 +1,54 @@
+[package]
+authors = ["Alexandre Bury <alexandre.bury@gmail.com>"]
+description = "Binding for the zstd compression library."
+documentation = "https://docs.rs/zstd"
+keywords = ["zstd", "zstandard", "compression"]
+categories = ["compression", "api-bindings"]
+license = "MIT"
+name = "zstd"
+repository = "https://github.com/gyscos/zstd-rs"
+version = "0.12.3+zstd.1.5.2"
+exclude = ["assets/*.zst"]
+readme = "Readme.md"
+edition = "2018"
+rust-version = "1.43"  # Some features (including default ones) can raise this value
+
+[package.metadata.docs.rs]
+features = ["experimental", "zstdmt", "zdict_builder", "doc-cfg"]
+
+[badges]
+travis-ci = { repository = "gyscos/zstd-rs" }
+
+[dependencies]
+zstd-safe = { path = "zstd-safe", version = "6.0.3", default-features = false, features = ["std"] }
+
+[dev-dependencies]
+clap = {version = "4.0", features=["derive"]}
+humansize = "2.0"
+partial-io = "0.5"
+walkdir = "2.2"
+
+[features]
+default = ["legacy", "arrays", "zdict_builder"]
+
+bindgen = ["zstd-safe/bindgen"]
+debug = ["zstd-safe/debug"]
+legacy = ["zstd-safe/legacy"]
+pkg-config = ["zstd-safe/pkg-config"]
+wasm = []
+zstdmt = ["zstd-safe/zstdmt"]
+experimental = ["zstd-safe/experimental"]
+thin = ["zstd-safe/thin"]
+arrays = ["zstd-safe/arrays"]
+no_asm = ["zstd-safe/no_asm"]
+doc-cfg = []
+zdict_builder = ["zstd-safe/zdict_builder"]
+
+# These two are for cross-language LTO.
+# Will only work if `clang` is used to build the C library.
+fat-lto = ["zstd-safe/fat-lto"]
+thin-lto = ["zstd-safe/thin-lto"]
+
+[[example]]
+name = "train"
+required-features = ["zdict_builder"]
diff --git a/LICENSE b/LICENSE
new file mode 100644 (file)
index 0000000..1190b3c
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,8 @@
+The MIT License (MIT)
+Copyright (c) 2016 Alexandre Bury
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/Readme.md b/Readme.md
new file mode 100644 (file)
index 0000000..d2c4b1b
--- /dev/null
+++ b/Readme.md
@@ -0,0 +1,109 @@
+# zstd
+
+[![Build on Linux](https://github.com/gyscos/zstd-rs/actions/workflows/linux.yml/badge.svg)](https://github.com/gyscos/zstd-rs/actions/workflows/linux.yml)
+[![Build on Windows](https://github.com/gyscos/zstd-rs/actions/workflows/windows.yml/badge.svg)](https://github.com/gyscos/zstd-rs/actions/workflows/windows.yml)
+[![Build on macOS](https://github.com/gyscos/zstd-rs/actions/workflows/macos.yml/badge.svg)](https://github.com/gyscos/zstd-rs/actions/workflows/macos.yml)
+[![crates.io](https://img.shields.io/crates/v/zstd.svg)](https://crates.io/crates/zstd)
+[![MIT licensed](https://img.shields.io/badge/license-MIT-blue.svg)](./LICENSE)
+
+This library is a rust binding for the [zstd compression library][zstd].
+
+# [Documentation][doc]
+
+## 1 - Add to `cargo.toml`
+
+### Using [cargo-edit]
+
+```bash
+$ cargo add zstd
+```
+
+### Manually
+
+```toml
+# Cargo.toml
+
+[dependencies]
+zstd = "0.12"
+```
+
+## 2 - Usage
+
+This library provides `Read` and `Write` wrappers to handle (de)compression,
+along with convenience functions to made common tasks easier.
+
+For instance, `stream::copy_encode` and `stream::copy_decode` are easy-to-use
+wrappers around `std::io::copy`. Check the [stream] example:
+
+```rust
+use std::io;
+
+// This function use the convenient `copy_encode` method
+fn compress(level: i32) {
+    zstd::stream::copy_encode(io::stdin(), io::stdout(), level).unwrap();
+}
+
+// This function does the same thing, directly using an `Encoder`:
+fn compress_manually(level: i32) {
+    let mut encoder = zstd::stream::Encoder::new(io::stdout(), level).unwrap();
+    io::copy(&mut io::stdin(), &mut encoder).unwrap();
+    encoder.finish().unwrap();
+}
+
+fn decompress() {
+    zstd::stream::copy_decode(io::stdin(), io::stdout()).unwrap();
+}
+```
+
+# Asynchronous support
+
+The [`async-compression`](https://github.com/Nemo157/async-compression/) crate
+provides an async-ready integration of various compression algorithms,
+including `zstd-rs`.
+
+# Compile it yourself
+
+`zstd` is included as a submodule. To get everything during your clone, use:
+
+```
+git clone https://github.com/gyscos/zstd-rs --recursive
+```
+
+Or, if you cloned it without the `--recursive` flag,
+call this from inside the repository:
+
+```
+git submodule update --init
+```
+
+Then, running `cargo build` should take care
+of building the C library and linking to it.
+
+# Build-time bindgen
+
+This library includes a pre-generated `bindings.rs` file.
+You can also generate new bindings at build-time, using the `bindgen` feature:
+
+```
+cargo build --features bindgen
+```
+
+# TODO
+
+* Benchmarks, optimizations, ...
+
+# Disclaimer
+
+This implementation is largely inspired by bozaro's [lz4-rs].
+
+# License
+
+* The zstd C library is under a dual BSD/GPLv2 license.
+* This zstd-rs binding library is under a [MIT](LICENSE) license.
+
+[zstd]: https://github.com/facebook/zstd
+[lz4-rs]: https://github.com/bozaro/lz4-rs
+[cargo-edit]: https://github.com/killercup/cargo-edit#cargo-add
+[doc]: https://docs.rs/zstd
+[stream]: examples/stream.rs
+[submodule]: https://git-scm.com/book/en/v2/Git-Tools-Submodules
diff --git a/appveyor.yml b/appveyor.yml
new file mode 100644 (file)
index 0000000..33cb406
--- /dev/null
@@ -0,0 +1,40 @@
+os: Visual Studio 2015
+
+cache:
+  - c:\cargo\registry
+  - c:\cargo\git
+
+init:
+  - mkdir c:\cargo
+  - mkdir c:\rustup
+  - SET PATH=c:\cargo\bin;%PATH%
+
+environment:
+  CARGO_HOME: "c:\\cargo"
+  RUSTUP_HOME: "c:\\rustup"
+
+  matrix:
+    - TARGET: i686-pc-windows-msvc
+      CHANNEL: stable
+    - TARGET: x86_64-pc-windows-msvc
+      CHANNEL: stable
+    - TARGET: i686-pc-windows-gnu
+      CHANNEL: stable
+    - TARGET: x86_64-pc-windows-gnu
+      CHANNEL: stable
+
+install:
+  - appveyor DownloadFile https://win.rustup.rs/ -FileName rustup-init.exe
+  - rustup-init.exe -y --default-host %TARGET% --no-modify-path
+  - set PATH=%PATH%;%USERPROFILE%\.cargo\bin
+  - rustc -V
+  - cargo -V
+  - git submodule update --init --recursive
+
+# 'cargo test' takes care of building for us, so disable Appveyor's build stage. This prevents
+# the "directory does not contain a project or solution file" error.
+build: false
+
+# Equivalent to Travis' `script` phase
+test_script:
+  - cargo test --verbose %CARGOFLAGS%
diff --git a/assets/example.txt b/assets/example.txt
new file mode 100644 (file)
index 0000000..8e89355
--- /dev/null
@@ -0,0 +1,40 @@
+’Twas brillig, and the slithy toves
+Did gyre and gimble in the wade;
+All mimsy were the borogoves,
+And the mome raths outgrabe.
+
+
+"Beware the Jabberwock, my son!
+The jaws that bite, the claws that catch!
+Beware the Jubjub bird, and shun
+The frumious Bandersnatch!"
+
+
+He took his vorpal sword in hand:
+Long time the manxome foe he sought—
+So rested he by the Tumtum tree,
+And stood awhile in thought.
+
+
+And as in uffish thought he stood,
+The Jabberwock, with eyes of flame,
+Came whiffling through the tulgey wood,
+And burbled as it came!
+
+
+One, two! One, two! And through and through
+The vorpal blade went snicker-snack!
+He left it dead, and with its head
+He went galumphing back.
+
+
+"And hast thou slain the Jabberwock?
+Come to my arms, my beamish boy!
+O frabjous day! Callooh! Callay!"
+He chortled in his joy.
+
+
+’Twas brillig, and the slithy toves
+Did gyre and gimble in the wabe;
+All mimsy were the borogoves,
+And the mome raths outgrabe.
diff --git a/examples/benchmark.rs b/examples/benchmark.rs
new file mode 100644 (file)
index 0000000..b750262
--- /dev/null
@@ -0,0 +1,99 @@
+use clap::Parser;
+use humansize::{format_size, DECIMAL};
+use std::io::Read;
+use std::path::PathBuf;
+
+#[derive(Parser, Debug)]
+#[command(author, version, about, long_about=None)]
+struct Args {
+    /// Directory containing the data to compress.
+    /// To use the silesia corpus, run the following commands:
+    ///
+    /// ```
+    /// wget http://sun.aei.polsl.pl/~sdeor/corpus/silesia.zip
+    /// unzip silesia.zip -d silesia/
+    /// cargo run --example benchmark -- silesia/",
+    /// ```
+    dir: PathBuf,
+
+    /// First compression level to test.
+    #[arg(short, long)]
+    begin: i32,
+
+    /// Last compression level to test.
+    #[arg(short, long)]
+    end: i32,
+}
+
+fn main() {
+    let args = Args::parse();
+
+    // Step 1: load data in memory
+    let files: Vec<Vec<u8>> = std::fs::read_dir(args.dir)
+        .unwrap()
+        .map(|file| {
+            let file = file.unwrap();
+
+            let mut content = Vec::new();
+            std::fs::File::open(file.path())
+                .unwrap()
+                .read_to_end(&mut content)
+                .unwrap();
+            content
+        })
+        .collect();
+
+    let total_size: usize = files.iter().map(|data| data.len()).sum();
+
+    // Step 3: compress data
+
+    // Print tsv headers
+    println!(
+        "{}\t{}\t{}\t{}",
+        "Compression level",
+        "Compression ratio",
+        "Compression speed",
+        "Decompression speed"
+    );
+
+    for level in args.begin..args.end {
+        // Compress each sample sequentially.
+        let start = std::time::Instant::now();
+
+        let compressed: Vec<Vec<u8>> = files
+            .iter()
+            .map(|data| zstd::encode_all(&data[..], level).unwrap())
+            .collect();
+        let mid = std::time::Instant::now();
+
+        let uncompressed: Vec<Vec<u8>> = compressed
+            .iter()
+            .map(|data| zstd::decode_all(&data[..]).unwrap())
+            .collect();
+        let end = std::time::Instant::now();
+
+        for (original, processed) in files.iter().zip(uncompressed.iter()) {
+            assert_eq!(&original[..], &processed[..]);
+        }
+
+        let compress_time = mid - start;
+        let decompress_time = end - mid;
+
+        let compress_seconds = compress_time.as_secs() as f64
+            + compress_time.subsec_nanos() as f64 * 1e-9;
+
+        let decompress_seconds = decompress_time.as_secs() as f64
+            + decompress_time.subsec_nanos() as f64 * 1e-9;
+
+        let compressed_size: usize = compressed.iter().map(Vec::len).sum();
+
+        let speed = (total_size as f64 / compress_seconds) as usize;
+        let speed = format_size(speed, DECIMAL);
+
+        let d_speed = (total_size as f64 / decompress_seconds) as usize;
+        let d_speed = format_size(d_speed, DECIMAL);
+
+        let ratio = compressed_size as f64 / total_size as f64;
+        println!("{}\t{:.3}\t{}/s\t{}/s", level, 1.0 / ratio, speed, d_speed);
+    }
+}
diff --git a/examples/stream.rs b/examples/stream.rs
new file mode 100644 (file)
index 0000000..41e2651
--- /dev/null
@@ -0,0 +1,39 @@
+use std::env;
+use std::io::{self, Write};
+use std::str::FromStr;
+
+fn main() {
+    match env::args().nth(1) {
+        None => {
+            writeln!(
+                &mut io::stderr(),
+                "Invalid option. Usage: `stream [-d|-1..-21]`"
+            )
+            .unwrap();
+        }
+        Some(ref option) if option == "-d" => decompress(),
+        Some(ref option) => {
+            if option.starts_with('-') {
+                let level = match i32::from_str(&option[1..]) {
+                    Ok(level) => level,
+                    Err(e) => panic!("Error parsing compression level: {}", e),
+                };
+                compress(level);
+            } else {
+                writeln!(
+                    &mut io::stderr(),
+                    "Invalid option. Usage: `stream [-d|-1..-21]`"
+                )
+                .unwrap();
+            }
+        }
+    }
+}
+
+fn compress(level: i32) {
+    zstd::stream::copy_encode(io::stdin(), io::stdout(), level).unwrap();
+}
+
+fn decompress() {
+    zstd::stream::copy_decode(io::stdin(), io::stdout()).unwrap();
+}
diff --git a/examples/train.rs b/examples/train.rs
new file mode 100644 (file)
index 0000000..601d8d6
--- /dev/null
@@ -0,0 +1,29 @@
+use clap::Parser;
+use std::io;
+use std::path::PathBuf;
+
+#[derive(Parser, Debug)]
+#[command(author, version, about, long_about=None)]
+/// This program trains a dictionary from one or more files,
+/// to make future compression of similar small files more efficient.
+///
+/// The dictionary will need to be present during decompression,
+/// but if you need to compress many small files individually,
+/// it may be worth the trouble.
+struct Args {
+    /// Maximum dictionary size in bytes.
+    #[arg(short, long)]
+    max_size: usize,
+
+    /// Files to use as input.
+    files: Vec<PathBuf>,
+}
+
+fn main() {
+    let args = Args::parse();
+
+    let dict = zstd::dict::from_files(&args.files, args.max_size).unwrap();
+
+    let mut dict_reader: &[u8] = &dict;
+    io::copy(&mut dict_reader, &mut io::stdout()).unwrap();
+}
diff --git a/examples/zstd.rs b/examples/zstd.rs
new file mode 100644 (file)
index 0000000..3d485f8
--- /dev/null
@@ -0,0 +1,49 @@
+use zstd;
+
+use std::env;
+use std::fs;
+use std::io;
+
+const SUFFIX: &'static str = ".zst";
+
+fn main() {
+    for arg in env::args().skip(1) {
+        if arg.ends_with(SUFFIX) {
+            match decompress(&arg) {
+                Ok(()) => println!("Decompressed {}", arg),
+                Err(e) => println!("Error decompressing {}: {}", arg, e),
+            }
+        } else {
+            match compress(&arg) {
+                Ok(()) => println!("Compressed {}", arg),
+                Err(e) => println!("Error compressing {}: {}", arg, e),
+            }
+        }
+    }
+}
+
+fn compress(source: &str) -> io::Result<()> {
+    let mut file = fs::File::open(source)?;
+    let mut encoder = {
+        let target = fs::File::create(source.to_string() + SUFFIX)?;
+        zstd::Encoder::new(target, 1)?
+    };
+
+    io::copy(&mut file, &mut encoder)?;
+    encoder.finish()?;
+
+    Ok(())
+}
+
+fn decompress(source: &str) -> io::Result<()> {
+    let mut decoder = {
+        let file = fs::File::open(source)?;
+        zstd::Decoder::new(file)?
+    };
+
+    let mut target = fs::File::create(source.trim_end_matches(SUFFIX))?;
+
+    io::copy(&mut decoder, &mut target)?;
+
+    Ok(())
+}
diff --git a/examples/zstdcat.rs b/examples/zstdcat.rs
new file mode 100644 (file)
index 0000000..c1b3dc8
--- /dev/null
@@ -0,0 +1,40 @@
+use clap::Parser;
+use std::fs;
+use std::io;
+
+#[derive(Parser, Debug)]
+#[command(author, version, about, long_about=None)]
+struct Args {
+    /// Files to decompress. With no file, or when given -, read standard input.
+    file: Vec<String>,
+}
+
+fn main() {
+    // This will be a simple application:
+    // takes a single (repeatable and optional) argument.
+    let args = Args::parse();
+
+    // If nothign was given, act as if `-` was there.
+    if args.file.is_empty() {
+        decompress_file("-").unwrap();
+    } else {
+        for file in &args.file {
+            decompress_file(file).unwrap();
+        }
+    }
+}
+
+// Dispatch the source reader depending on the filename
+fn decompress_file(file: &str) -> io::Result<()> {
+    match file {
+        "-" => decompress_from(io::stdin()),
+        other => decompress_from(io::BufReader::new(fs::File::open(other)?)),
+    }
+}
+
+// Decompress from a `Reader` into stdout
+fn decompress_from<R: io::Read>(r: R) -> io::Result<()> {
+    let mut decoder = zstd::Decoder::new(r)?;
+    io::copy(&mut decoder, &mut io::stdout())?;
+    Ok(())
+}
diff --git a/rustfmt.toml b/rustfmt.toml
new file mode 100644 (file)
index 0000000..a067bf0
--- /dev/null
@@ -0,0 +1,3 @@
+max_width = 79
+reorder_imports = true
+use_try_shorthand = true
diff --git a/src/bulk/compressor.rs b/src/bulk/compressor.rs
new file mode 100644 (file)
index 0000000..1ac948d
--- /dev/null
@@ -0,0 +1,164 @@
+use crate::map_error_code;
+
+use std::io;
+use zstd_safe;
+
+/// Allows to compress independently multiple chunks of data.
+///
+/// Each job will be processed entirely in-memory without streaming, so this
+/// is most fitting for many small jobs. To compress larger volume that don't
+/// easily fit in memory, a streaming compression may be more appropriate.
+///
+/// It is more efficient than a streaming compressor for 2 reasons:
+/// * It re-uses the zstd context between jobs to avoid re-allocations
+/// * It avoids copying data from a `Read` into a temporary buffer before compression.
+#[derive(Default)]
+pub struct Compressor<'a> {
+    context: zstd_safe::CCtx<'a>,
+}
+
+impl Compressor<'static> {
+    /// Creates a new zstd compressor
+    pub fn new(level: i32) -> io::Result<Self> {
+        Self::with_dictionary(level, &[])
+    }
+
+    /// Creates a new zstd compressor, using the given dictionary.
+    ///
+    /// Note that using a dictionary means that decompression will need to use
+    /// the same dictionary.
+    pub fn with_dictionary(level: i32, dictionary: &[u8]) -> io::Result<Self> {
+        let mut compressor = Self::default();
+
+        compressor.set_dictionary(level, dictionary)?;
+
+        Ok(compressor)
+    }
+}
+
+impl<'a> Compressor<'a> {
+    /// Creates a new compressor using an existing `EncoderDictionary`.
+    ///
+    /// The compression level will be the one specified when creating the dictionary.
+    ///
+    /// Note that using a dictionary means that decompression will need to use
+    /// the same dictionary.
+    pub fn with_prepared_dictionary<'b>(
+        dictionary: &'a crate::dict::EncoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let mut compressor = Self::default();
+
+        compressor.set_prepared_dictionary(dictionary)?;
+
+        Ok(compressor)
+    }
+
+    /// Changes the compression level used by this compressor.
+    ///
+    /// *This will clear any dictionary previously registered.*
+    ///
+    /// If you want to keep the existing dictionary, you will need to pass it again to
+    /// `Self::set_dictionary` instead of using this method.
+    pub fn set_compression_level(&mut self, level: i32) -> io::Result<()> {
+        self.set_dictionary(level, &[])
+    }
+
+    /// Changes the dictionary and compression level used by this compressor.
+    ///
+    /// Will affect future compression jobs.
+    ///
+    /// Note that using a dictionary means that decompression will need to use
+    /// the same dictionary.
+    pub fn set_dictionary(
+        &mut self,
+        level: i32,
+        dictionary: &[u8],
+    ) -> io::Result<()> {
+        self.context
+            .set_parameter(zstd_safe::CParameter::CompressionLevel(level))
+            .map_err(map_error_code)?;
+
+        self.context
+            .load_dictionary(dictionary)
+            .map_err(map_error_code)?;
+
+        Ok(())
+    }
+
+    /// Changes the dictionary used by this compressor.
+    ///
+    /// The compression level used when preparing the dictionary will be used.
+    ///
+    /// Note that using a dictionary means that decompression will need to use
+    /// the same dictionary.
+    pub fn set_prepared_dictionary<'b>(
+        &mut self,
+        dictionary: &'a crate::dict::EncoderDictionary<'b>,
+    ) -> io::Result<()>
+    where
+        'b: 'a,
+    {
+        self.context
+            .ref_cdict(dictionary.as_cdict())
+            .map_err(map_error_code)?;
+
+        Ok(())
+    }
+
+    /// Compress a single block of data to the given destination buffer.
+    ///
+    /// Returns the number of bytes written, or an error if something happened
+    /// (for instance if the destination buffer was too small).
+    ///
+    /// A level of `0` uses zstd's default (currently `3`).
+    pub fn compress_to_buffer<C: zstd_safe::WriteBuf + ?Sized>(
+        &mut self,
+        source: &[u8],
+        destination: &mut C,
+    ) -> io::Result<usize> {
+        self.context
+            .compress2(destination, source)
+            .map_err(map_error_code)
+    }
+
+    /// Compresses a block of data and returns the compressed result.
+    ///
+    /// A level of `0` uses zstd's default (currently `3`).
+    pub fn compress(&mut self, data: &[u8]) -> io::Result<Vec<u8>> {
+        // We allocate a big buffer, slightly larger than the input data.
+        let buffer_len = zstd_safe::compress_bound(data.len());
+        let mut buffer = Vec::with_capacity(buffer_len);
+
+        self.compress_to_buffer(data, &mut buffer)?;
+
+        // Should we shrink the vec? Meh, let the user do it if he wants.
+        Ok(buffer)
+    }
+
+    /// Gives mutable access to the internal context.
+    pub fn context_mut(&mut self) -> &mut zstd_safe::CCtx<'a> {
+        &mut self.context
+    }
+
+    /// Sets a compression parameter for this compressor.
+    pub fn set_parameter(
+        &mut self,
+        parameter: zstd_safe::CParameter,
+    ) -> io::Result<()> {
+        self.context
+            .set_parameter(parameter)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+
+    crate::encoder_parameters!();
+}
+
+fn _assert_traits() {
+    fn _assert_send<T: Send>(_: T) {}
+
+    _assert_send(Compressor::new(0));
+}
diff --git a/src/bulk/decompressor.rs b/src/bulk/decompressor.rs
new file mode 100644 (file)
index 0000000..5e056d5
--- /dev/null
@@ -0,0 +1,151 @@
+use crate::map_error_code;
+
+#[cfg(feature = "experimental")]
+use std::convert::TryInto;
+use std::io;
+use zstd_safe;
+
+/// Allows to decompress independently multiple blocks of data.
+///
+/// This reduces memory usage compared to calling `decompress` multiple times.
+#[derive(Default)]
+pub struct Decompressor<'a> {
+    context: zstd_safe::DCtx<'a>,
+}
+
+impl Decompressor<'static> {
+    /// Creates a new zstd decompressor.
+    pub fn new() -> io::Result<Self> {
+        Self::with_dictionary(&[])
+    }
+
+    /// Creates a new zstd decompressor, using the given dictionary.
+    pub fn with_dictionary(dictionary: &[u8]) -> io::Result<Self> {
+        let mut decompressor = Self::default();
+
+        decompressor.set_dictionary(dictionary)?;
+
+        Ok(decompressor)
+    }
+}
+
+impl<'a> Decompressor<'a> {
+    /// Creates a new decompressor using an existing `DecoderDictionary`.
+    ///
+    /// Note that using a dictionary means that compression will need to use
+    /// the same dictionary.
+    pub fn with_prepared_dictionary<'b>(
+        dictionary: &'a crate::dict::DecoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let mut decompressor = Self::default();
+
+        decompressor.set_prepared_dictionary(dictionary)?;
+
+        Ok(decompressor)
+    }
+
+    /// Changes the dictionary used by this decompressor.
+    ///
+    /// Will affect future compression jobs.
+    ///
+    /// Note that using a dictionary means that compression will need to use
+    /// the same dictionary.
+    pub fn set_dictionary(&mut self, dictionary: &[u8]) -> io::Result<()> {
+        self.context
+            .load_dictionary(dictionary)
+            .map_err(map_error_code)?;
+
+        Ok(())
+    }
+
+    /// Changes the dictionary used by this decompressor.
+    ///
+    /// Note that using a dictionary means that compression will need to use
+    /// the same dictionary.
+    pub fn set_prepared_dictionary<'b>(
+        &mut self,
+        dictionary: &'a crate::dict::DecoderDictionary<'b>,
+    ) -> io::Result<()>
+    where
+        'b: 'a,
+    {
+        self.context
+            .ref_ddict(dictionary.as_ddict())
+            .map_err(map_error_code)?;
+
+        Ok(())
+    }
+
+    /// Deompress a single block of data to the given destination buffer.
+    ///
+    /// Returns the number of bytes written, or an error if something happened
+    /// (for instance if the destination buffer was too small).
+    pub fn decompress_to_buffer<C: zstd_safe::WriteBuf + ?Sized>(
+        &mut self,
+        source: &[u8],
+        destination: &mut C,
+    ) -> io::Result<usize> {
+        self.context
+            .decompress(destination, source)
+            .map_err(map_error_code)
+    }
+
+    /// Decompress a block of data, and return the result in a `Vec<u8>`.
+    ///
+    /// The decompressed data should be less than `capacity` bytes,
+    /// or an error will be returned.
+    pub fn decompress(
+        &mut self,
+        data: &[u8],
+        capacity: usize,
+    ) -> io::Result<Vec<u8>> {
+        let capacity =
+            Self::upper_bound(data).unwrap_or(capacity).min(capacity);
+        let mut buffer = Vec::with_capacity(capacity);
+        self.decompress_to_buffer(data, &mut buffer)?;
+        Ok(buffer)
+    }
+
+    /// Sets a decompression parameter for this decompressor.
+    pub fn set_parameter(
+        &mut self,
+        parameter: zstd_safe::DParameter,
+    ) -> io::Result<()> {
+        self.context
+            .set_parameter(parameter)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+
+    crate::decoder_parameters!();
+
+    /// Get an upper bound on the decompressed size of data, if available
+    ///
+    /// This can be used to pre-allocate enough capacity for `decompress_to_buffer`
+    /// and is used by `decompress` to ensure that it does not over-allocate if
+    /// you supply a large `capacity`.
+    ///
+    /// Will return `None` if the upper bound cannot be determined or is larger than `usize::MAX`
+    ///
+    /// Note that unless the `experimental` feature is enabled, this will always return `None`.
+    pub fn upper_bound(_data: &[u8]) -> Option<usize> {
+        #[cfg(feature = "experimental")]
+        {
+            let bound = zstd_safe::decompress_bound(_data).ok()?;
+            bound.try_into().ok()
+        }
+        #[cfg(not(feature = "experimental"))]
+        {
+            None
+        }
+    }
+}
+
+fn _assert_traits() {
+    fn _assert_send<T: Send>(_: T) {}
+
+    _assert_send(Decompressor::new());
+}
diff --git a/src/bulk/mod.rs b/src/bulk/mod.rs
new file mode 100644 (file)
index 0000000..e8382b6
--- /dev/null
@@ -0,0 +1,56 @@
+//! Compress and decompress data in bulk.
+//!
+//! These methods process all the input data at once.
+//! It is therefore best used with relatively small blocks
+//! (like small network packets).
+
+mod compressor;
+mod decompressor;
+
+#[cfg(test)]
+mod tests;
+
+pub use self::compressor::Compressor;
+pub use self::decompressor::Decompressor;
+
+use std::io;
+
+/// Compresses a single block of data to the given destination buffer.
+///
+/// Returns the number of bytes written, or an error if something happened
+/// (for instance if the destination buffer was too small).
+///
+/// A level of `0` uses zstd's default (currently `3`).
+pub fn compress_to_buffer(
+    source: &[u8],
+    destination: &mut [u8],
+    level: i32,
+) -> io::Result<usize> {
+    Compressor::new(level)?.compress_to_buffer(source, destination)
+}
+
+/// Compresses a block of data and returns the compressed result.
+///
+/// A level of `0` uses zstd's default (currently `3`).
+pub fn compress(data: &[u8], level: i32) -> io::Result<Vec<u8>> {
+    Compressor::new(level)?.compress(data)
+}
+
+/// Deompress a single block of data to the given destination buffer.
+///
+/// Returns the number of bytes written, or an error if something happened
+/// (for instance if the destination buffer was too small).
+pub fn decompress_to_buffer(
+    source: &[u8],
+    destination: &mut [u8],
+) -> io::Result<usize> {
+    Decompressor::new()?.decompress_to_buffer(source, destination)
+}
+
+/// Decompresses a block of data and returns the decompressed result.
+///
+/// The decompressed data should be less than `capacity` bytes,
+/// or an error will be returned.
+pub fn decompress(data: &[u8], capacity: usize) -> io::Result<Vec<u8>> {
+    Decompressor::new()?.decompress(data, capacity)
+}
diff --git a/src/bulk/tests.rs b/src/bulk/tests.rs
new file mode 100644 (file)
index 0000000..408e163
--- /dev/null
@@ -0,0 +1,42 @@
+use super::{compress, decompress};
+
+const TEXT: &str = include_str!("../../assets/example.txt");
+
+#[test]
+fn test_direct() {
+    // Can we include_str!("assets/example.txt")?
+    // It's excluded from the packaging step, so maybe not.
+    crate::test_cycle_unwrap(
+        TEXT.as_bytes(),
+        |data| compress(data, 1),
+        |data| decompress(data, TEXT.len()),
+    );
+}
+
+#[test]
+fn test_stream_compat() {
+    // We can bulk-compress and stream-decode
+    crate::test_cycle_unwrap(
+        TEXT.as_bytes(),
+        |data| compress(data, 1),
+        |data| crate::decode_all(data),
+    );
+
+    // We can stream-encode and bulk-decompress
+    crate::test_cycle_unwrap(
+        TEXT.as_bytes(),
+        |data| crate::encode_all(data, 1),
+        |data| decompress(data, TEXT.len()),
+    );
+}
+
+#[test]
+fn has_content_size() {
+    let compressed = compress(TEXT.as_bytes(), 1).unwrap();
+
+    // Bulk functions by default include the content size.
+    assert_eq!(
+        zstd_safe::get_frame_content_size(&compressed).unwrap(),
+        Some(TEXT.len() as u64)
+    );
+}
diff --git a/src/dict.rs b/src/dict.rs
new file mode 100644 (file)
index 0000000..d101c9f
--- /dev/null
@@ -0,0 +1,219 @@
+//! Train a dictionary from various sources.
+//!
+//! A dictionary can help improve the compression of small files.
+//! The dictionary must be present during decompression,
+//! but can be shared accross multiple "similar" files.
+//!
+//! Creating a dictionary using the `zstd` C library,
+//! using the `zstd` command-line interface, using this library,
+//! or using the `train` binary provided, should give the same result,
+//! and are therefore completely compatible.
+//!
+//! To use, see [`Encoder::with_dictionary`] or [`Decoder::with_dictionary`].
+//!
+//! [`Encoder::with_dictionary`]: ../struct.Encoder.html#method.with_dictionary
+//! [`Decoder::with_dictionary`]: ../struct.Decoder.html#method.with_dictionary
+
+#[cfg(feature = "zdict_builder")]
+use std::io::{self, Read};
+
+pub use zstd_safe::{CDict, DDict};
+
+/// Prepared dictionary for compression
+///
+/// A dictionary can include its own copy of the data (if it is `'static`), or it can merely point
+/// to a separate buffer (if it has another lifetime).
+pub struct EncoderDictionary<'a> {
+    cdict: CDict<'a>,
+}
+
+impl EncoderDictionary<'static> {
+    /// Creates a prepared dictionary for compression.
+    ///
+    /// This will copy the dictionary internally.
+    pub fn copy(dictionary: &[u8], level: i32) -> Self {
+        Self {
+            cdict: zstd_safe::create_cdict(dictionary, level),
+        }
+    }
+}
+
+impl<'a> EncoderDictionary<'a> {
+    #[cfg(feature = "experimental")]
+    #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "experimental")))]
+    /// Create prepared dictionary for compression
+    ///
+    /// A level of `0` uses zstd's default (currently `3`).
+    ///
+    /// Only available with the `experimental` feature. Use `EncoderDictionary::copy` otherwise.
+    pub fn new(dictionary: &'a [u8], level: i32) -> Self {
+        Self {
+            cdict: zstd_safe::CDict::create_by_reference(dictionary, level),
+        }
+    }
+
+    /// Returns reference to `CDict` inner object
+    pub fn as_cdict(&self) -> &CDict<'a> {
+        &self.cdict
+    }
+}
+
+/// Prepared dictionary for decompression
+pub struct DecoderDictionary<'a> {
+    ddict: DDict<'a>,
+}
+
+impl DecoderDictionary<'static> {
+    /// Create a prepared dictionary for decompression.
+    ///
+    /// This will copy the dictionary internally.
+    pub fn copy(dictionary: &[u8]) -> Self {
+        Self {
+            ddict: zstd_safe::DDict::create(dictionary),
+        }
+    }
+}
+
+impl<'a> DecoderDictionary<'a> {
+    #[cfg(feature = "experimental")]
+    #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "experimental")))]
+    /// Create prepared dictionary for decompression
+    ///
+    /// Only available with the `experimental` feature. Use `DecoderDictionary::copy` otherwise.
+    pub fn new(dict: &'a [u8]) -> Self {
+        Self {
+            ddict: zstd_safe::DDict::create_by_reference(dict),
+        }
+    }
+
+    /// Returns reference to `DDict` inner object
+    pub fn as_ddict(&self) -> &DDict<'a> {
+        &self.ddict
+    }
+}
+
+/// Train a dictionary from a big continuous chunk of data.
+///
+/// This is the most efficient way to train a dictionary,
+/// since this is directly fed into `zstd`.
+#[cfg(feature = "zdict_builder")]
+#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "zdict_builder")))]
+pub fn from_continuous(
+    sample_data: &[u8],
+    sample_sizes: &[usize],
+    max_size: usize,
+) -> io::Result<Vec<u8>> {
+    use crate::map_error_code;
+
+    // Complain if the lengths don't add up to the entire data.
+    if sample_sizes.iter().sum::<usize>() != sample_data.len() {
+        return Err(io::Error::new(
+            io::ErrorKind::Other,
+            "sample sizes don't add up".to_string(),
+        ));
+    }
+
+    let mut result = Vec::with_capacity(max_size);
+    zstd_safe::train_from_buffer(&mut result, sample_data, sample_sizes)
+        .map_err(map_error_code)?;
+    Ok(result)
+}
+
+/// Train a dictionary from multiple samples.
+///
+/// The samples will internaly be copied to a single continuous buffer,
+/// so make sure you have enough memory available.
+///
+/// If you need to stretch your system's limits,
+/// [`from_continuous`] directly uses the given slice.
+///
+/// [`from_continuous`]: ./fn.from_continuous.html
+#[cfg(feature = "zdict_builder")]
+#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "zdict_builder")))]
+pub fn from_samples<S: AsRef<[u8]>>(
+    samples: &[S],
+    max_size: usize,
+) -> io::Result<Vec<u8>> {
+    // Copy every sample to a big chunk of memory
+    let data: Vec<_> =
+        samples.iter().flat_map(|s| s.as_ref()).cloned().collect();
+    let sizes: Vec<_> = samples.iter().map(|s| s.as_ref().len()).collect();
+
+    from_continuous(&data, &sizes, max_size)
+}
+
+/// Train a dict from a list of files.
+#[cfg(feature = "zdict_builder")]
+#[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "zdict_builder")))]
+pub fn from_files<I, P>(filenames: I, max_size: usize) -> io::Result<Vec<u8>>
+where
+    P: AsRef<std::path::Path>,
+    I: IntoIterator<Item = P>,
+{
+    use std::fs;
+
+    let mut buffer = Vec::new();
+    let mut sizes = Vec::new();
+
+    for filename in filenames {
+        let mut file = fs::File::open(filename)?;
+        let len = file.read_to_end(&mut buffer)?;
+        sizes.push(len);
+    }
+
+    from_continuous(&buffer, &sizes, max_size)
+}
+
+#[cfg(test)]
+#[cfg(feature = "zdict_builder")]
+mod tests {
+    use std::fs;
+    use std::io;
+    use std::io::Read;
+
+    use walkdir;
+
+    #[test]
+    fn test_dict_training() {
+        // Train a dictionary
+        let paths: Vec<_> = walkdir::WalkDir::new("src")
+            .into_iter()
+            .map(|entry| entry.unwrap())
+            .map(|entry| entry.into_path())
+            .filter(|path| path.to_str().unwrap().ends_with(".rs"))
+            .collect();
+
+        let dict = super::from_files(&paths, 4000).unwrap();
+
+        for path in paths {
+            let mut buffer = Vec::new();
+            let mut file = fs::File::open(path).unwrap();
+            let mut content = Vec::new();
+            file.read_to_end(&mut content).unwrap();
+            io::copy(
+                &mut &content[..],
+                &mut crate::stream::Encoder::with_dictionary(
+                    &mut buffer,
+                    1,
+                    &dict,
+                )
+                .unwrap()
+                .auto_finish(),
+            )
+            .unwrap();
+
+            let mut result = Vec::new();
+            io::copy(
+                &mut crate::stream::Decoder::with_dictionary(
+                    &buffer[..],
+                    &dict[..],
+                )
+                .unwrap(),
+                &mut result,
+            )
+            .unwrap();
+
+            assert_eq!(&content, &result);
+        }
+    }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644 (file)
index 0000000..d3c16a3
--- /dev/null
@@ -0,0 +1,78 @@
+//! Rust binding to the [zstd library][zstd].
+//!
+//! This crate provides:
+//!
+//! * An [encoder](stream/write/struct.Encoder.html) to compress data using zstd
+//!   and send the output to another write.
+//! * A [decoder](stream/read/struct.Decoder.html) to read input data from a `Read`
+//!   and decompress it.
+//! * Convenient functions for common tasks.
+//!
+//! # Example
+//!
+//! ```no_run
+//! use std::io;
+//!
+//! // Uncompress input and print the result.
+//! zstd::stream::copy_decode(io::stdin(), io::stdout()).unwrap();
+//! ```
+//!
+//! [zstd]: https://github.com/facebook/zstd
+#![deny(missing_docs)]
+#![cfg_attr(feature = "doc-cfg", feature(doc_cfg))]
+
+// Re-export the zstd-safe crate.
+pub use zstd_safe;
+
+pub mod bulk;
+pub mod dict;
+
+#[macro_use]
+pub mod stream;
+
+use std::io;
+
+/// Default compression level.
+pub use zstd_safe::CLEVEL_DEFAULT as DEFAULT_COMPRESSION_LEVEL;
+
+/// The accepted range of compression levels.
+pub fn compression_level_range(
+) -> std::ops::RangeInclusive<zstd_safe::CompressionLevel> {
+    zstd_safe::min_c_level()..=zstd_safe::max_c_level()
+}
+
+#[doc(no_inline)]
+pub use crate::stream::{decode_all, encode_all, Decoder, Encoder};
+
+/// Returns the error message as io::Error based on error_code.
+fn map_error_code(code: usize) -> io::Error {
+    let msg = zstd_safe::get_error_name(code);
+    io::Error::new(io::ErrorKind::Other, msg.to_string())
+}
+
+// Some helper functions to write full-cycle tests.
+
+#[cfg(test)]
+fn test_cycle<F, G>(data: &[u8], f: F, g: G)
+where
+    F: Fn(&[u8]) -> Vec<u8>,
+    G: Fn(&[u8]) -> Vec<u8>,
+{
+    let mid = f(data);
+    let end = g(&mid);
+    assert_eq!(data, &end[..]);
+}
+
+#[cfg(test)]
+fn test_cycle_unwrap<F, G>(data: &[u8], f: F, g: G)
+where
+    F: Fn(&[u8]) -> io::Result<Vec<u8>>,
+    G: Fn(&[u8]) -> io::Result<Vec<u8>>,
+{
+    test_cycle(data, |data| f(data).unwrap(), |data| g(data).unwrap())
+}
+
+#[test]
+fn default_compression_level_in_range() {
+    assert!(compression_level_range().contains(&DEFAULT_COMPRESSION_LEVEL));
+}
diff --git a/src/stream/functions.rs b/src/stream/functions.rs
new file mode 100644 (file)
index 0000000..e2aca36
--- /dev/null
@@ -0,0 +1,59 @@
+use std::io;
+
+use super::{Decoder, Encoder};
+
+/// Decompress from the given source as if using a `Decoder`.
+///
+/// The input data must be in the zstd frame format.
+pub fn decode_all<R: io::Read>(source: R) -> io::Result<Vec<u8>> {
+    let mut result = Vec::new();
+    copy_decode(source, &mut result)?;
+    Ok(result)
+}
+
+/// Decompress from the given source as if using a `Decoder`.
+///
+/// Decompressed data will be appended to `destination`.
+pub fn copy_decode<R, W>(source: R, mut destination: W) -> io::Result<()>
+where
+    R: io::Read,
+    W: io::Write,
+{
+    let mut decoder = Decoder::new(source)?;
+    io::copy(&mut decoder, &mut destination)?;
+    Ok(())
+}
+
+/// Compress all data from the given source as if using an `Encoder`.
+///
+/// Result will be in the zstd frame format.
+///
+/// A level of `0` uses zstd's default (currently `3`).
+pub fn encode_all<R: io::Read>(source: R, level: i32) -> io::Result<Vec<u8>> {
+    let mut result = Vec::<u8>::new();
+    copy_encode(source, &mut result, level)?;
+    Ok(result)
+}
+
+/// Compress all data from the given source as if using an `Encoder`.
+///
+/// Compressed data will be appended to `destination`.
+///
+/// A level of `0` uses zstd's default (currently `3`).
+pub fn copy_encode<R, W>(
+    mut source: R,
+    destination: W,
+    level: i32,
+) -> io::Result<()>
+where
+    R: io::Read,
+    W: io::Write,
+{
+    let mut encoder = Encoder::new(destination, level)?;
+    io::copy(&mut source, &mut encoder)?;
+    encoder.finish()?;
+    Ok(())
+}
+
+#[cfg(tests)]
+mod tests {}
diff --git a/src/stream/mod.rs b/src/stream/mod.rs
new file mode 100644 (file)
index 0000000..e2c04f6
--- /dev/null
@@ -0,0 +1,211 @@
+//! Compress and decompress Zstd streams.
+//!
+//! Zstd streams are the main way to compress and decompress data.
+//! They are compatible with the `zstd` command-line tool.
+//!
+//! This module provides both `Read` and `Write` interfaces to compressing and
+//! decompressing.
+
+pub mod read;
+pub mod write;
+
+mod functions;
+pub mod zio;
+
+#[cfg(test)]
+mod tests;
+
+pub mod raw;
+
+pub use self::functions::{copy_decode, copy_encode, decode_all, encode_all};
+pub use self::read::Decoder;
+pub use self::write::{AutoFinishEncoder, Encoder};
+
+#[doc(hidden)]
+#[macro_export]
+/// Common functions for the decoder, both in read and write mode.
+macro_rules! decoder_parameters {
+    () => {
+        /// Sets the maximum back-reference distance.
+        ///
+        /// The actual maximum distance is going to be `2^log_distance`.
+        ///
+        /// This will need to at least match the value set when compressing.
+        pub fn window_log_max(&mut self, log_distance: u32) -> io::Result<()> {
+            self.set_parameter(zstd_safe::DParameter::WindowLogMax(
+                log_distance,
+            ))
+        }
+
+        #[cfg(feature = "experimental")]
+        #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "experimental")))]
+        /// Enables or disabled expecting the 4-byte magic header
+        ///
+        /// Only available with the `experimental` feature.
+        ///
+        /// This will need to match the settings used when compressing.
+        pub fn include_magicbytes(
+            &mut self,
+            include_magicbytes: bool,
+        ) -> io::Result<()> {
+            self.set_parameter(zstd_safe::DParameter::Format(
+                if include_magicbytes {
+                    zstd_safe::FrameFormat::One
+                } else {
+                    zstd_safe::FrameFormat::Magicless
+                },
+            ))
+        }
+    };
+}
+
+#[doc(hidden)]
+#[macro_export]
+/// Common functions for the decoder, both in read and write mode.
+macro_rules! decoder_common {
+    ($readwrite:ident) => {
+        /// Sets a decompression parameter on the decompression stream.
+        pub fn set_parameter(
+            &mut self,
+            parameter: zstd_safe::DParameter,
+        ) -> io::Result<()> {
+            self.$readwrite.operation_mut().set_parameter(parameter)
+        }
+
+        $crate::decoder_parameters!();
+    };
+}
+
+#[doc(hidden)]
+#[macro_export]
+/// Parameter-setters for the encoder. Relies on a `set_parameter` method.
+macro_rules! encoder_parameters {
+    () => {
+        /// Controls whether zstd should include a content checksum at the end
+        /// of each frame.
+        pub fn include_checksum(
+            &mut self,
+            include_checksum: bool,
+        ) -> io::Result<()> {
+            self.set_parameter(zstd_safe::CParameter::ChecksumFlag(
+                include_checksum,
+            ))
+        }
+
+        /// Enables multithreaded compression
+        ///
+        /// * If `n_workers == 0` (default), then multithreaded will be
+        ///   disabled.
+        /// * If `n_workers >= 1`, then compression will be done in separate
+        ///   threads.
+        ///
+        /// So even `n_workers = 1` may increase performance by separating
+        /// IO and compression.
+        ///
+        /// Note: This is only available if the `zstdmt` cargo feature is activated.
+        #[cfg(feature = "zstdmt")]
+        #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "zstdmt")))]
+        pub fn multithread(&mut self, n_workers: u32) -> io::Result<()> {
+            self.set_parameter(zstd_safe::CParameter::NbWorkers(n_workers))
+        }
+
+        /// Enables or disables storing of the dict id.
+        ///
+        /// Defaults to true. If false, the behaviour of decoding with a wrong
+        /// dictionary is undefined.
+        pub fn include_dictid(
+            &mut self,
+            include_dictid: bool,
+        ) -> io::Result<()> {
+            self.set_parameter(zstd_safe::CParameter::DictIdFlag(
+                include_dictid,
+            ))
+        }
+
+        /// Enables or disabled storing of the contentsize.
+        ///
+        /// Note that this only has an effect if the size is given with `set_pledged_src_size`.
+        pub fn include_contentsize(
+            &mut self,
+            include_contentsize: bool,
+        ) -> io::Result<()> {
+            self.set_parameter(zstd_safe::CParameter::ContentSizeFlag(
+                include_contentsize,
+            ))
+        }
+        /// Enables or disables long-distance matching
+        pub fn long_distance_matching(
+            &mut self,
+            long_distance_matching: bool,
+        ) -> io::Result<()> {
+            self.set_parameter(
+                zstd_safe::CParameter::EnableLongDistanceMatching(
+                    long_distance_matching,
+                ),
+            )
+        }
+
+        /// Sets the maximum back-reference distance.
+        ///
+        /// The actual maximum distance is going to be `2^log_distance`.
+        ///
+        /// Note that decompression will need to use at least the same setting.
+        pub fn window_log(&mut self, log_distance: u32) -> io::Result<()> {
+            self.set_parameter(zstd_safe::CParameter::WindowLog(log_distance))
+        }
+
+        #[cfg(feature = "experimental")]
+        #[cfg_attr(feature = "doc-cfg", doc(cfg(feature = "experimental")))]
+        /// Enables or disable the magic bytes at the beginning of each frame.
+        ///
+        /// If disabled, include_magicbytes must also be called on the decoder.
+        ///
+        /// Only available with the `experimental` feature.
+        ///
+        /// Note that decompression will need to use the same setting.
+        pub fn include_magicbytes(
+            &mut self,
+            include_magicbytes: bool,
+        ) -> io::Result<()> {
+            self.set_parameter(zstd_safe::CParameter::Format(
+                if include_magicbytes {
+                    zstd_safe::FrameFormat::One
+                } else {
+                    zstd_safe::FrameFormat::Magicless
+                },
+            ))
+        }
+    };
+}
+
+#[doc(hidden)]
+#[macro_export]
+/// Common functions for the encoder, both in read and write mode.
+macro_rules! encoder_common {
+    ($readwrite:ident) => {
+        /// Sets the given zstd compression parameter.
+        pub fn set_parameter(
+            &mut self,
+            parameter: zstd_safe::CParameter,
+        ) -> io::Result<()> {
+            self.$readwrite.operation_mut().set_parameter(parameter)
+        }
+
+        /// Sets the expected size of the input.
+        ///
+        /// This affects the compression effectiveness.
+        ///
+        /// It is an error to give an incorrect size (an error will be returned when closing the
+        /// stream if the size does not match what was pledged).
+        ///
+        /// Giving a `None` size means the size is unknown (this is the default).
+        pub fn set_pledged_src_size(
+            &mut self,
+            size: Option<u64>,
+        ) -> io::Result<()> {
+            self.$readwrite.operation_mut().set_pledged_src_size(size)
+        }
+
+        $crate::encoder_parameters!();
+    };
+}
diff --git a/src/stream/raw.rs b/src/stream/raw.rs
new file mode 100644 (file)
index 0000000..ee1d292
--- /dev/null
@@ -0,0 +1,374 @@
+//! Raw in-memory stream compression/decompression.
+//!
+//! This module defines a `Decoder` and an `Encoder` to decode/encode streams
+//! of data using buffers.
+//!
+//! They are mostly thin wrappers around `zstd_safe::{DCtx, CCtx}`.
+use std::io;
+
+pub use zstd_safe::{CParameter, DParameter, InBuffer, OutBuffer, WriteBuf};
+
+use crate::dict::{DecoderDictionary, EncoderDictionary};
+use crate::map_error_code;
+
+/// Represents an abstract compression/decompression operation.
+///
+/// This trait covers both `Encoder` and `Decoder`.
+pub trait Operation {
+    /// Performs a single step of this operation.
+    ///
+    /// Should return a hint for the next input size.
+    ///
+    /// If the result is `Ok(0)`, it may indicate that a frame was just
+    /// finished.
+    fn run<C: WriteBuf + ?Sized>(
+        &mut self,
+        input: &mut InBuffer<'_>,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize>;
+
+    /// Performs a single step of this operation.
+    ///
+    /// This is a comvenience wrapper around `Operation::run` if you don't
+    /// want to deal with `InBuffer`/`OutBuffer`.
+    fn run_on_buffers(
+        &mut self,
+        input: &[u8],
+        output: &mut [u8],
+    ) -> io::Result<Status> {
+        let mut input = InBuffer::around(input);
+        let mut output = OutBuffer::around(output);
+
+        let remaining = self.run(&mut input, &mut output)?;
+
+        Ok(Status {
+            remaining,
+            bytes_read: input.pos(),
+            bytes_written: output.pos(),
+        })
+    }
+
+    /// Flushes any internal buffer, if any.
+    ///
+    /// Returns the number of bytes still in the buffer.
+    /// To flush entirely, keep calling until it returns `Ok(0)`.
+    fn flush<C: WriteBuf + ?Sized>(
+        &mut self,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize> {
+        let _ = output;
+        Ok(0)
+    }
+
+    /// Prepares the operation for a new frame.
+    ///
+    /// This is hopefully cheaper than creating a new operation.
+    fn reinit(&mut self) -> io::Result<()> {
+        Ok(())
+    }
+
+    /// Finishes the operation, writing any footer if necessary.
+    ///
+    /// Returns the number of bytes still to write.
+    ///
+    /// Keep calling this method until it returns `Ok(0)`,
+    /// and then don't ever call this method.
+    fn finish<C: WriteBuf + ?Sized>(
+        &mut self,
+        output: &mut OutBuffer<'_, C>,
+        finished_frame: bool,
+    ) -> io::Result<usize> {
+        let _ = output;
+        let _ = finished_frame;
+        Ok(0)
+    }
+}
+
+/// Dummy operation that just copies its input to the output.
+pub struct NoOp;
+
+impl Operation for NoOp {
+    fn run<C: WriteBuf + ?Sized>(
+        &mut self,
+        input: &mut InBuffer<'_>,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize> {
+        // Skip the prelude
+        let src = &input.src[input.pos..];
+        // Safe because `output.pos() <= output.dst.capacity()`.
+        let dst = unsafe { output.dst.as_mut_ptr().add(output.pos()) };
+
+        // Ignore anything past the end
+        let len = usize::min(src.len(), output.dst.capacity());
+        let src = &src[..len];
+
+        // Safe because:
+        // * `len` is less than either of the two lengths
+        // * `src` and `dst` do not overlap because we have `&mut` to each.
+        unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), dst, len) };
+        input.set_pos(input.pos() + len);
+        unsafe { output.set_pos(output.pos() + len) };
+
+        Ok(0)
+    }
+}
+
+/// Describes the result of an operation.
+pub struct Status {
+    /// Number of bytes expected for next input.
+    ///
+    /// This is just a hint.
+    pub remaining: usize,
+
+    /// Number of bytes read from the input.
+    pub bytes_read: usize,
+
+    /// Number of bytes written to the output.
+    pub bytes_written: usize,
+}
+
+/// An in-memory decoder for streams of data.
+pub struct Decoder<'a> {
+    context: zstd_safe::DCtx<'a>,
+}
+
+impl Decoder<'static> {
+    /// Creates a new decoder.
+    pub fn new() -> io::Result<Self> {
+        Self::with_dictionary(&[])
+    }
+
+    /// Creates a new decoder initialized with the given dictionary.
+    pub fn with_dictionary(dictionary: &[u8]) -> io::Result<Self> {
+        let mut context = zstd_safe::DCtx::create();
+        context.init().map_err(map_error_code)?;
+        context
+            .load_dictionary(dictionary)
+            .map_err(map_error_code)?;
+        Ok(Decoder { context })
+    }
+}
+
+impl<'a> Decoder<'a> {
+    /// Creates a new decoder, using an existing `DecoderDictionary`.
+    pub fn with_prepared_dictionary<'b>(
+        dictionary: &DecoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let mut context = zstd_safe::DCtx::create();
+        context
+            .ref_ddict(dictionary.as_ddict())
+            .map_err(map_error_code)?;
+        Ok(Decoder { context })
+    }
+
+    /// Sets a decompression parameter for this decoder.
+    pub fn set_parameter(&mut self, parameter: DParameter) -> io::Result<()> {
+        self.context
+            .set_parameter(parameter)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+}
+
+impl Operation for Decoder<'_> {
+    fn run<C: WriteBuf + ?Sized>(
+        &mut self,
+        input: &mut InBuffer<'_>,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize> {
+        self.context
+            .decompress_stream(output, input)
+            .map_err(map_error_code)
+    }
+
+    fn flush<C: WriteBuf + ?Sized>(
+        &mut self,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize> {
+        // To flush, we just offer no additional input.
+        self.run(&mut InBuffer::around(&[]), output)?;
+
+        // We don't _know_ how much (decompressed data) there is still in buffer.
+        if output.pos() < output.dst.capacity() {
+            // We only know when there's none (the output buffer is not full).
+            Ok(0)
+        } else {
+            // Otherwise, pretend there's still "1 byte" remaining.
+            Ok(1)
+        }
+    }
+
+    fn reinit(&mut self) -> io::Result<()> {
+        self.context
+            .reset(zstd_safe::ResetDirective::SessionOnly)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+
+    fn finish<C: WriteBuf + ?Sized>(
+        &mut self,
+        _output: &mut OutBuffer<'_, C>,
+        finished_frame: bool,
+    ) -> io::Result<usize> {
+        if finished_frame {
+            Ok(0)
+        } else {
+            Err(io::Error::new(
+                io::ErrorKind::UnexpectedEof,
+                "incomplete frame",
+            ))
+        }
+    }
+}
+
+/// An in-memory encoder for streams of data.
+pub struct Encoder<'a> {
+    context: zstd_safe::CCtx<'a>,
+}
+
+impl Encoder<'static> {
+    /// Creates a new encoder.
+    pub fn new(level: i32) -> io::Result<Self> {
+        Self::with_dictionary(level, &[])
+    }
+
+    /// Creates a new encoder initialized with the given dictionary.
+    pub fn with_dictionary(level: i32, dictionary: &[u8]) -> io::Result<Self> {
+        let mut context = zstd_safe::CCtx::create();
+
+        context
+            .set_parameter(CParameter::CompressionLevel(level))
+            .map_err(map_error_code)?;
+
+        context
+            .load_dictionary(dictionary)
+            .map_err(map_error_code)?;
+
+        Ok(Encoder { context })
+    }
+}
+
+impl<'a> Encoder<'a> {
+    /// Creates a new encoder using an existing `EncoderDictionary`.
+    pub fn with_prepared_dictionary<'b>(
+        dictionary: &EncoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let mut context = zstd_safe::CCtx::create();
+        context
+            .ref_cdict(dictionary.as_cdict())
+            .map_err(map_error_code)?;
+        Ok(Encoder { context })
+    }
+
+    /// Sets a compression parameter for this encoder.
+    pub fn set_parameter(&mut self, parameter: CParameter) -> io::Result<()> {
+        self.context
+            .set_parameter(parameter)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+
+    /// Sets the size of the input expected by zstd.
+    ///
+    /// May affect compression ratio.
+    ///
+    /// It is an error to give an incorrect size (an error _will_ be returned when closing the
+    /// stream).
+    ///
+    /// If `None` is given, it assume the size is not known (default behaviour).
+    pub fn set_pledged_src_size(
+        &mut self,
+        pledged_src_size: Option<u64>,
+    ) -> io::Result<()> {
+        self.context
+            .set_pledged_src_size(pledged_src_size)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+}
+
+impl<'a> Operation for Encoder<'a> {
+    fn run<C: WriteBuf + ?Sized>(
+        &mut self,
+        input: &mut InBuffer<'_>,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize> {
+        self.context
+            .compress_stream(output, input)
+            .map_err(map_error_code)
+    }
+
+    fn flush<C: WriteBuf + ?Sized>(
+        &mut self,
+        output: &mut OutBuffer<'_, C>,
+    ) -> io::Result<usize> {
+        self.context.flush_stream(output).map_err(map_error_code)
+    }
+
+    fn finish<C: WriteBuf + ?Sized>(
+        &mut self,
+        output: &mut OutBuffer<'_, C>,
+        _finished_frame: bool,
+    ) -> io::Result<usize> {
+        self.context.end_stream(output).map_err(map_error_code)
+    }
+
+    fn reinit(&mut self) -> io::Result<()> {
+        self.context
+            .reset(zstd_safe::ResetDirective::SessionOnly)
+            .map_err(map_error_code)?;
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    // This requires impl for [u8; N] which is currently behind a feature.
+    #[cfg(feature = "arrays")]
+    #[test]
+    fn test_cycle() {
+        use super::{Decoder, Encoder, InBuffer, Operation, OutBuffer};
+
+        let mut encoder = Encoder::new(1).unwrap();
+        let mut decoder = Decoder::new().unwrap();
+
+        // Step 1: compress
+        let mut input = InBuffer::around(b"AbcdefAbcdefabcdef");
+
+        let mut output = [0u8; 128];
+        let mut output = OutBuffer::around(&mut output);
+
+        loop {
+            encoder.run(&mut input, &mut output).unwrap();
+
+            if input.pos == input.src.len() {
+                break;
+            }
+        }
+        encoder.finish(&mut output, true).unwrap();
+
+        let initial_data = input.src;
+
+        // Step 2: decompress
+        let mut input = InBuffer::around(output.as_slice());
+        let mut output = [0u8; 128];
+        let mut output = OutBuffer::around(&mut output);
+
+        loop {
+            decoder.run(&mut input, &mut output).unwrap();
+
+            if input.pos == input.src.len() {
+                break;
+            }
+        }
+
+        assert_eq!(initial_data, output.as_slice());
+    }
+}
diff --git a/src/stream/read/mod.rs b/src/stream/read/mod.rs
new file mode 100644 (file)
index 0000000..a3a947b
--- /dev/null
@@ -0,0 +1,211 @@
+//! Implement pull-based [`Read`] trait for both compressing and decompressing.
+use std::io::{self, BufRead, BufReader, Read};
+
+use crate::dict::{DecoderDictionary, EncoderDictionary};
+use crate::stream::{raw, zio};
+use zstd_safe;
+
+#[cfg(test)]
+mod tests;
+
+/// A decoder that decompress input data from another `Read`.
+///
+/// This allows to read a stream of compressed data
+/// (good for files or heavy network stream).
+pub struct Decoder<'a, R: BufRead> {
+    reader: zio::Reader<R, raw::Decoder<'a>>,
+}
+
+/// An encoder that compress input data from another `Read`.
+pub struct Encoder<'a, R: BufRead> {
+    reader: zio::Reader<R, raw::Encoder<'a>>,
+}
+
+impl<R: Read> Decoder<'static, BufReader<R>> {
+    /// Creates a new decoder.
+    pub fn new(reader: R) -> io::Result<Self> {
+        let buffer_size = zstd_safe::DCtx::in_size();
+
+        Self::with_buffer(BufReader::with_capacity(buffer_size, reader))
+    }
+}
+
+impl<R: BufRead> Decoder<'static, R> {
+    /// Creates a new decoder around a `BufRead`.
+    pub fn with_buffer(reader: R) -> io::Result<Self> {
+        Self::with_dictionary(reader, &[])
+    }
+    /// Creates a new decoder, using an existing dictionary.
+    ///
+    /// The dictionary must be the same as the one used during compression.
+    pub fn with_dictionary(reader: R, dictionary: &[u8]) -> io::Result<Self> {
+        let decoder = raw::Decoder::with_dictionary(dictionary)?;
+        let reader = zio::Reader::new(reader, decoder);
+
+        Ok(Decoder { reader })
+    }
+}
+impl<'a, R: BufRead> Decoder<'a, R> {
+    /// Sets this `Decoder` to stop after the first frame.
+    ///
+    /// By default, it keeps concatenating frames until EOF is reached.
+    #[must_use]
+    pub fn single_frame(mut self) -> Self {
+        self.reader.set_single_frame();
+        self
+    }
+
+    /// Creates a new decoder, using an existing `DecoderDictionary`.
+    ///
+    /// The dictionary must be the same as the one used during compression.
+    pub fn with_prepared_dictionary<'b>(
+        reader: R,
+        dictionary: &DecoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let decoder = raw::Decoder::with_prepared_dictionary(dictionary)?;
+        let reader = zio::Reader::new(reader, decoder);
+
+        Ok(Decoder { reader })
+    }
+
+    /// Recommendation for the size of the output buffer.
+    pub fn recommended_output_size() -> usize {
+        zstd_safe::DCtx::out_size()
+    }
+
+    /// Acquire a reference to the underlying reader.
+    pub fn get_ref(&self) -> &R {
+        self.reader.reader()
+    }
+
+    /// Acquire a mutable reference to the underlying reader.
+    ///
+    /// Note that mutation of the reader may result in surprising results if
+    /// this decoder is continued to be used.
+    pub fn get_mut(&mut self) -> &mut R {
+        self.reader.reader_mut()
+    }
+
+    /// Return the inner `Read`.
+    ///
+    /// Calling `finish()` is not *required* after reading a stream -
+    /// just use it if you need to get the `Read` back.
+    pub fn finish(self) -> R {
+        self.reader.into_inner()
+    }
+
+    crate::decoder_common!(reader);
+}
+
+impl<R: BufRead> Read for Decoder<'_, R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.reader.read(buf)
+    }
+}
+
+impl<R: Read> Encoder<'static, BufReader<R>> {
+    /// Creates a new encoder.
+    pub fn new(reader: R, level: i32) -> io::Result<Self> {
+        let buffer_size = zstd_safe::CCtx::in_size();
+
+        Self::with_buffer(BufReader::with_capacity(buffer_size, reader), level)
+    }
+}
+
+impl<R: BufRead> Encoder<'static, R> {
+    /// Creates a new encoder around a `BufRead`.
+    pub fn with_buffer(reader: R, level: i32) -> io::Result<Self> {
+        Self::with_dictionary(reader, level, &[])
+    }
+
+    /// Creates a new encoder, using an existing dictionary.
+    ///
+    /// The dictionary must be the same as the one used during compression.
+    pub fn with_dictionary(
+        reader: R,
+        level: i32,
+        dictionary: &[u8],
+    ) -> io::Result<Self> {
+        let encoder = raw::Encoder::with_dictionary(level, dictionary)?;
+        let reader = zio::Reader::new(reader, encoder);
+
+        Ok(Encoder { reader })
+    }
+}
+
+impl<'a, R: BufRead> Encoder<'a, R> {
+    /// Creates a new encoder, using an existing `EncoderDictionary`.
+    ///
+    /// The dictionary must be the same as the one used during compression.
+    pub fn with_prepared_dictionary<'b>(
+        reader: R,
+        dictionary: &EncoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let encoder = raw::Encoder::with_prepared_dictionary(dictionary)?;
+        let reader = zio::Reader::new(reader, encoder);
+
+        Ok(Encoder { reader })
+    }
+
+    /// Recommendation for the size of the output buffer.
+    pub fn recommended_output_size() -> usize {
+        zstd_safe::CCtx::out_size()
+    }
+
+    /// Acquire a reference to the underlying reader.
+    pub fn get_ref(&self) -> &R {
+        self.reader.reader()
+    }
+
+    /// Acquire a mutable reference to the underlying reader.
+    ///
+    /// Note that mutation of the reader may result in surprising results if
+    /// this encoder is continued to be used.
+    pub fn get_mut(&mut self) -> &mut R {
+        self.reader.reader_mut()
+    }
+
+    /// Flush any internal buffer.
+    ///
+    /// This ensures all input consumed so far is compressed.
+    ///
+    /// Since it prevents bundling currently buffered data with future input,
+    /// it may affect compression ratio.
+    ///
+    /// * Returns the number of bytes written to `out`.
+    /// * Returns `Ok(0)` when everything has been flushed.
+    pub fn flush(&mut self, out: &mut [u8]) -> io::Result<usize> {
+        self.reader.flush(out)
+    }
+
+    /// Return the inner `Read`.
+    ///
+    /// Calling `finish()` is not *required* after reading a stream -
+    /// just use it if you need to get the `Read` back.
+    pub fn finish(self) -> R {
+        self.reader.into_inner()
+    }
+
+    crate::encoder_common!(reader);
+}
+
+impl<R: BufRead> Read for Encoder<'_, R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.reader.read(buf)
+    }
+}
+
+fn _assert_traits() {
+    use std::io::Cursor;
+
+    fn _assert_send<T: Send>(_: T) {}
+
+    _assert_send(Decoder::new(Cursor::new(Vec::new())));
+    _assert_send(Encoder::new(Cursor::new(Vec::new()), 1));
+}
diff --git a/src/stream/read/tests.rs b/src/stream/read/tests.rs
new file mode 100644 (file)
index 0000000..49190f7
--- /dev/null
@@ -0,0 +1,27 @@
+use crate::stream::read::{Decoder, Encoder};
+use std::io::Read;
+
+#[test]
+fn test_error_handling() {
+    let invalid_input = b"Abcdefghabcdefgh";
+
+    let mut decoder = Decoder::new(&invalid_input[..]).unwrap();
+    let output = decoder.read_to_end(&mut Vec::new());
+
+    assert_eq!(output.is_err(), true);
+}
+
+#[test]
+fn test_cycle() {
+    let input = b"Abcdefghabcdefgh";
+
+    let mut encoder = Encoder::new(&input[..], 1).unwrap();
+    let mut buffer = Vec::new();
+    encoder.read_to_end(&mut buffer).unwrap();
+
+    let mut decoder = Decoder::new(&buffer[..]).unwrap();
+    let mut buffer = Vec::new();
+    decoder.read_to_end(&mut buffer).unwrap();
+
+    assert_eq!(input, &buffer[..]);
+}
diff --git a/src/stream/tests.rs b/src/stream/tests.rs
new file mode 100644 (file)
index 0000000..7a77a67
--- /dev/null
@@ -0,0 +1,269 @@
+use super::{copy_encode, decode_all, encode_all};
+use super::{Decoder, Encoder};
+
+use partial_io::{PartialOp, PartialWrite};
+
+use std::io;
+use std::iter;
+
+#[test]
+fn test_end_of_frame() {
+    use std::io::{Read, Write};
+
+    let mut enc = Encoder::new(Vec::new(), 1).unwrap();
+    enc.write_all(b"foo").unwrap();
+    let mut compressed = enc.finish().unwrap();
+
+    // Add footer/whatever to underlying storage.
+    compressed.push(0);
+
+    // Drain zstd stream until end-of-frame.
+    let mut dec = Decoder::new(&compressed[..]).unwrap().single_frame();
+    let mut buf = Vec::new();
+    dec.read_to_end(&mut buf).unwrap();
+    assert_eq!(&buf, b"foo", "Error decoding a single frame.");
+}
+
+#[test]
+fn test_concatenated_frames() {
+    let mut buffer = Vec::new();
+    copy_encode(&b"foo"[..], &mut buffer, 1).unwrap();
+    copy_encode(&b"bar"[..], &mut buffer, 2).unwrap();
+    copy_encode(&b"baz"[..], &mut buffer, 3).unwrap();
+
+    assert_eq!(
+        &decode_all(&buffer[..]).unwrap(),
+        b"foobarbaz",
+        "Error decoding concatenated frames."
+    );
+}
+
+#[test]
+fn test_flush() {
+    use std::io::Write;
+
+    let buf = Vec::new();
+    let mut z = Encoder::new(buf, 19).unwrap();
+
+    z.write_all(b"hello").unwrap();
+
+    z.flush().unwrap(); // Might corrupt stream
+    let buf = z.finish().unwrap();
+
+    let s = decode_all(&buf[..]).unwrap();
+    assert_eq!(s, b"hello", "Error decoding after flush.");
+}
+
+#[test]
+fn test_try_finish() {
+    use std::io::Write;
+    let mut z = setup_try_finish();
+
+    z.get_mut().set_ops(iter::repeat(PartialOp::Unlimited));
+
+    // flush() should continue to work even though write() doesn't.
+    z.flush().unwrap();
+
+    let buf = match z.try_finish() {
+        Ok(buf) => buf.into_inner(),
+        Err((_z, e)) => panic!("try_finish failed with {:?}", e),
+    };
+
+    // Make sure the multiple try_finish calls didn't screw up the internal
+    // buffer and continued to produce valid compressed data.
+    assert_eq!(&decode_all(&buf[..]).unwrap(), b"hello", "Error decoding");
+}
+
+#[test]
+#[should_panic]
+fn test_write_after_try_finish() {
+    use std::io::Write;
+    let mut z = setup_try_finish();
+    z.write_all(b"hello world").unwrap();
+}
+
+fn setup_try_finish() -> Encoder<'static, PartialWrite<Vec<u8>>> {
+    use std::io::Write;
+
+    let buf =
+        PartialWrite::new(Vec::new(), iter::repeat(PartialOp::Unlimited));
+    let mut z = Encoder::new(buf, 19).unwrap();
+
+    z.write_all(b"hello").unwrap();
+
+    z.get_mut()
+        .set_ops(iter::repeat(PartialOp::Err(io::ErrorKind::WouldBlock)));
+
+    let (z, err) = z.try_finish().unwrap_err();
+    assert_eq!(
+        err.kind(),
+        io::ErrorKind::WouldBlock,
+        "expected WouldBlock error"
+    );
+
+    z
+}
+
+#[test]
+fn test_failing_write() {
+    use std::io::Write;
+
+    let buf = PartialWrite::new(
+        Vec::new(),
+        iter::repeat(PartialOp::Err(io::ErrorKind::WouldBlock)),
+    );
+    let mut z = Encoder::new(buf, 1).unwrap();
+
+    // Fill in enough data to make sure the buffer gets written out.
+    let input = vec![b'b'; 128 * 1024];
+    // This should work even though the inner writer rejects writes.
+    assert_eq!(
+        z.write(&input).unwrap(),
+        128 * 1024,
+        "did not write all input buffer"
+    );
+
+    // The next write would fail (the buffer still has some data in it).
+    assert_eq!(
+        z.write(b"abc").unwrap_err().kind(),
+        io::ErrorKind::WouldBlock,
+        "expected WouldBlock error"
+    );
+
+    z.get_mut().set_ops(iter::repeat(PartialOp::Unlimited));
+
+    // This shouldn't have led to any corruption.
+    let buf = z.finish().unwrap().into_inner();
+    assert_eq!(
+        &decode_all(&buf[..]).unwrap(),
+        &input,
+        "WouldBlock errors should not corrupt stream"
+    );
+}
+
+#[test]
+fn test_invalid_frame() {
+    use std::io::Read;
+
+    // I really hope this data is invalid...
+    let data = &[1u8, 2u8, 3u8, 4u8, 5u8];
+    let mut dec = Decoder::new(&data[..]).unwrap();
+    assert_eq!(
+        dec.read_to_end(&mut Vec::new()).err().map(|e| e.kind()),
+        Some(io::ErrorKind::Other),
+        "did not encounter expected 'invalid frame' error"
+    );
+}
+
+#[test]
+fn test_incomplete_frame() {
+    use std::io::{Read, Write};
+
+    let mut enc = Encoder::new(Vec::new(), 1).unwrap();
+    enc.write_all(b"This is a regular string").unwrap();
+    let mut compressed = enc.finish().unwrap();
+
+    let half_size = compressed.len() - 2;
+    compressed.truncate(half_size);
+
+    let mut dec = Decoder::new(&compressed[..]).unwrap();
+    assert_eq!(
+        dec.read_to_end(&mut Vec::new()).err().map(|e| e.kind()),
+        Some(io::ErrorKind::UnexpectedEof),
+        "did not encounter expected EOF error"
+    );
+}
+
+#[test]
+fn test_cli_compatibility() {
+    let input = include_bytes!("../../assets/example.txt.zst");
+
+    let output = decode_all(&input[..]).unwrap();
+
+    let expected = include_bytes!("../../assets/example.txt");
+
+    assert_eq!(
+        &output[..],
+        &expected[..],
+        "error decoding cli-compressed data"
+    );
+}
+
+#[cfg(feature = "legacy")]
+#[test]
+fn test_legacy() {
+    use std::fs;
+    use std::io::Read;
+
+    // Read the content from that file
+    let expected = include_bytes!("../../assets/example.txt");
+
+    for version in &[5, 6, 7, 8] {
+        let filename = format!("assets/example.txt.v{}.zst", version);
+        let file = fs::File::open(filename).unwrap();
+        let mut decoder = Decoder::new(file).unwrap();
+
+        let mut buffer = Vec::new();
+        decoder.read_to_end(&mut buffer).unwrap();
+
+        assert_eq!(
+            &expected[..],
+            &buffer[..],
+            "error decompressing legacy version {}",
+            version
+        );
+    }
+}
+
+// Check that compressing+decompressing some data gives back the original
+fn test_full_cycle(input: &[u8], level: i32) {
+    crate::test_cycle_unwrap(
+        input,
+        |data| encode_all(data, level),
+        |data| decode_all(data),
+    );
+}
+
+#[test]
+fn test_empty() {
+    // Test compressing empty data
+    for level in 1..19 {
+        test_full_cycle(b"", level);
+    }
+}
+
+#[test]
+fn test_ll_source() {
+    // Where could I find some long text?...
+    let data = include_bytes!("../../zstd-safe/zstd-sys/src/bindings_zstd.rs");
+    // Test a few compression levels.
+    // TODO: check them all?
+    for level in 1..5 {
+        // Test compressing actual data
+        test_full_cycle(data, level);
+    }
+}
+
+#[test]
+fn reader_to_writer() {
+    use std::io::{Read, Write};
+
+    let clear = include_bytes!("../../assets/example.txt");
+    // Compress using reader
+    let mut encoder = super::read::Encoder::new(&clear[..], 1).unwrap();
+
+    let mut compressed_buffer = Vec::new();
+    encoder.read_to_end(&mut compressed_buffer).unwrap();
+
+    // eprintln!("Compressed Buffer: {:?}", compressed_buffer);
+
+    // Decompress using writer
+    let mut decompressed_buffer = Vec::new();
+    let mut decoder =
+        super::write::Decoder::new(&mut decompressed_buffer).unwrap();
+    decoder.write_all(&compressed_buffer[..]).unwrap();
+    decoder.flush().unwrap();
+    // eprintln!("{:?}", decompressed_buffer);
+
+    assert_eq!(clear, &decompressed_buffer[..]);
+}
diff --git a/src/stream/write/mod.rs b/src/stream/write/mod.rs
new file mode 100644 (file)
index 0000000..6028b90
--- /dev/null
@@ -0,0 +1,404 @@
+//! Implement push-based [`Write`] trait for both compressing and decompressing.
+use std::io::{self, Write};
+
+use zstd_safe;
+
+use crate::dict::{DecoderDictionary, EncoderDictionary};
+use crate::stream::{raw, zio};
+
+#[cfg(test)]
+mod tests;
+
+/// An encoder that compress and forward data to another writer.
+///
+/// This allows to compress a stream of data
+/// (good for files or heavy network stream).
+///
+/// Don't forget to call [`finish()`] before dropping it!
+///
+/// Alternatively, you can call [`auto_finish()`] to use an
+/// [`AutoFinishEncoder`] that will finish on drop.
+///
+/// Note: The zstd library has its own internal input buffer (~128kb).
+///
+/// [`finish()`]: #method.finish
+/// [`auto_finish()`]: #method.auto_finish
+/// [`AutoFinishEncoder`]: AutoFinishEncoder
+pub struct Encoder<'a, W: Write> {
+    // output writer (compressed data)
+    writer: zio::Writer<W, raw::Encoder<'a>>,
+}
+
+/// A decoder that decompress and forward data to another writer.
+///
+/// Note that you probably want to `flush()` after writing your stream content.
+/// You can use [`auto_flush()`] to automatically flush the writer on drop.
+///
+/// [`auto_flush()`]: Decoder::auto_flush
+pub struct Decoder<'a, W: Write> {
+    // output writer (decompressed data)
+    writer: zio::Writer<W, raw::Decoder<'a>>,
+}
+
+/// A wrapper around an `Encoder<W>` that finishes the stream on drop.
+///
+/// This can be created by the [`auto_finish()`] method on the [`Encoder`].
+///
+/// [`auto_finish()`]: Encoder::auto_finish
+/// [`Encoder`]: Encoder
+pub struct AutoFinishEncoder<
+    'a,
+    W: Write,
+    F: FnMut(io::Result<W>) = Box<dyn Send + FnMut(io::Result<W>)>,
+> {
+    // We wrap this in an option to take it during drop.
+    encoder: Option<Encoder<'a, W>>,
+
+    on_finish: Option<F>,
+}
+
+/// A wrapper around a `Decoder<W>` that flushes the stream on drop.
+///
+/// This can be created by the [`auto_flush()`] method on the [`Decoder`].
+///
+/// [`auto_flush()`]: Decoder::auto_flush
+/// [`Decoder`]: Decoder
+pub struct AutoFlushDecoder<
+    'a,
+    W: Write,
+    F: FnMut(io::Result<()>) = Box<dyn Send + FnMut(io::Result<()>)>,
+> {
+    // We wrap this in an option to take it during drop.
+    decoder: Option<Decoder<'a, W>>,
+
+    on_flush: Option<F>,
+}
+
+impl<'a, W: Write, F: FnMut(io::Result<()>)> AutoFlushDecoder<'a, W, F> {
+    fn new(decoder: Decoder<'a, W>, on_flush: F) -> Self {
+        AutoFlushDecoder {
+            decoder: Some(decoder),
+            on_flush: Some(on_flush),
+        }
+    }
+
+    /// Acquires a reference to the underlying writer.
+    pub fn get_ref(&self) -> &W {
+        self.decoder.as_ref().unwrap().get_ref()
+    }
+
+    /// Acquires a mutable reference to the underlying writer.
+    ///
+    /// Note that mutation of the writer may result in surprising results if
+    /// this decoder is continued to be used.
+    ///
+    /// Mostly used for testing purposes.
+    pub fn get_mut(&mut self) -> &mut W {
+        self.decoder.as_mut().unwrap().get_mut()
+    }
+}
+
+impl<W, F> Drop for AutoFlushDecoder<'_, W, F>
+where
+    W: Write,
+    F: FnMut(io::Result<()>),
+{
+    fn drop(&mut self) {
+        let mut decoder = self.decoder.take().unwrap();
+        let result = decoder.flush();
+        if let Some(mut on_finish) = self.on_flush.take() {
+            on_finish(result);
+        }
+    }
+}
+
+impl<W: Write, F: FnMut(io::Result<()>)> Write for AutoFlushDecoder<'_, W, F> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.decoder.as_mut().unwrap().write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.decoder.as_mut().unwrap().flush()
+    }
+}
+
+impl<'a, W: Write, F: FnMut(io::Result<W>)> AutoFinishEncoder<'a, W, F> {
+    fn new(encoder: Encoder<'a, W>, on_finish: F) -> Self {
+        AutoFinishEncoder {
+            encoder: Some(encoder),
+            on_finish: Some(on_finish),
+        }
+    }
+
+    /// Acquires a reference to the underlying writer.
+    pub fn get_ref(&self) -> &W {
+        self.encoder.as_ref().unwrap().get_ref()
+    }
+
+    /// Acquires a mutable reference to the underlying writer.
+    ///
+    /// Note that mutation of the writer may result in surprising results if
+    /// this encoder is continued to be used.
+    ///
+    /// Mostly used for testing purposes.
+    pub fn get_mut(&mut self) -> &mut W {
+        self.encoder.as_mut().unwrap().get_mut()
+    }
+}
+
+impl<W: Write, F: FnMut(io::Result<W>)> Drop for AutoFinishEncoder<'_, W, F> {
+    fn drop(&mut self) {
+        let result = self.encoder.take().unwrap().finish();
+        if let Some(mut on_finish) = self.on_finish.take() {
+            on_finish(result);
+        }
+    }
+}
+
+impl<W: Write, F: FnMut(io::Result<W>)> Write for AutoFinishEncoder<'_, W, F> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.encoder.as_mut().unwrap().write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.encoder.as_mut().unwrap().flush()
+    }
+}
+
+impl<W: Write> Encoder<'static, W> {
+    /// Creates a new encoder.
+    ///
+    /// `level`: compression level (1-21).
+    ///
+    /// A level of `0` uses zstd's default (currently `3`).
+    pub fn new(writer: W, level: i32) -> io::Result<Self> {
+        Self::with_dictionary(writer, level, &[])
+    }
+
+    /// Creates a new encoder, using an existing dictionary.
+    ///
+    /// (Provides better compression ratio for small files,
+    /// but requires the dictionary to be present during decompression.)
+    ///
+    /// A level of `0` uses zstd's default (currently `3`).
+    pub fn with_dictionary(
+        writer: W,
+        level: i32,
+        dictionary: &[u8],
+    ) -> io::Result<Self> {
+        let encoder = raw::Encoder::with_dictionary(level, dictionary)?;
+        let writer = zio::Writer::new(writer, encoder);
+        Ok(Encoder { writer })
+    }
+}
+
+impl<'a, W: Write> Encoder<'a, W> {
+    /// Creates a new encoder, using an existing prepared `EncoderDictionary`.
+    ///
+    /// (Provides better compression ratio for small files,
+    /// but requires the dictionary to be present during decompression.)
+    pub fn with_prepared_dictionary<'b>(
+        writer: W,
+        dictionary: &EncoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let encoder = raw::Encoder::with_prepared_dictionary(dictionary)?;
+        let writer = zio::Writer::new(writer, encoder);
+        Ok(Encoder { writer })
+    }
+
+    /// Returns a wrapper around `self` that will finish the stream on drop.
+    ///
+    /// # Panic
+    ///
+    /// Panics on drop if an error happens when finishing the stream.
+    pub fn auto_finish(self) -> AutoFinishEncoder<'a, W> {
+        self.on_finish(Box::new(|result| {
+            result.unwrap();
+        }))
+    }
+
+    /// Returns an encoder that will finish the stream on drop.
+    ///
+    /// Calls the given callback with the result from `finish()`.
+    pub fn on_finish<F: FnMut(io::Result<W>)>(
+        self,
+        f: F,
+    ) -> AutoFinishEncoder<'a, W, F> {
+        AutoFinishEncoder::new(self, f)
+    }
+
+    /// Acquires a reference to the underlying writer.
+    pub fn get_ref(&self) -> &W {
+        self.writer.writer()
+    }
+
+    /// Acquires a mutable reference to the underlying writer.
+    ///
+    /// Note that mutation of the writer may result in surprising results if
+    /// this encoder is continued to be used.
+    pub fn get_mut(&mut self) -> &mut W {
+        self.writer.writer_mut()
+    }
+
+    /// **Required**: Finishes the stream.
+    ///
+    /// You *need* to finish the stream when you're done writing, either with
+    /// this method or with [`try_finish(self)`](#method.try_finish).
+    ///
+    /// This returns the inner writer in case you need it.
+    ///
+    /// To get back `self` in case an error happened, use `try_finish`.
+    ///
+    /// **Note**: If you don't want (or can't) call `finish()` manually after
+    ///           writing your data, consider using `auto_finish()` to get an
+    ///           `AutoFinishEncoder`.
+    pub fn finish(self) -> io::Result<W> {
+        self.try_finish().map_err(|(_, err)| err)
+    }
+
+    /// **Required**: Attempts to finish the stream.
+    ///
+    /// You *need* to finish the stream when you're done writing, either with
+    /// this method or with [`finish(self)`](#method.finish).
+    ///
+    /// This returns the inner writer if the finish was successful, or the
+    /// object plus an error if it wasn't.
+    ///
+    /// `write` on this object will panic after `try_finish` has been called,
+    /// even if it fails.
+    pub fn try_finish(mut self) -> Result<W, (Self, io::Error)> {
+        match self.writer.finish() {
+            // Return the writer, because why not
+            Ok(()) => Ok(self.writer.into_inner().0),
+            Err(e) => Err((self, e)),
+        }
+    }
+
+    /// Attemps to finish the stream.
+    ///
+    /// You *need* to finish the stream when you're done writing, either with
+    /// this method or with [`finish(self)`](#method.finish).
+    pub fn do_finish(&mut self) -> io::Result<()> {
+        self.writer.finish()
+    }
+
+    /// Return a recommendation for the size of data to write at once.
+    pub fn recommended_input_size() -> usize {
+        zstd_safe::CCtx::in_size()
+    }
+
+    crate::encoder_common!(writer);
+}
+
+impl<'a, W: Write> Write for Encoder<'a, W> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.writer.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.writer.flush()
+    }
+}
+
+impl<W: Write> Decoder<'static, W> {
+    /// Creates a new decoder.
+    pub fn new(writer: W) -> io::Result<Self> {
+        Self::with_dictionary(writer, &[])
+    }
+
+    /// Creates a new decoder, using an existing dictionary.
+    ///
+    /// (Provides better compression ratio for small files,
+    /// but requires the dictionary to be present during decompression.)
+    pub fn with_dictionary(writer: W, dictionary: &[u8]) -> io::Result<Self> {
+        let decoder = raw::Decoder::with_dictionary(dictionary)?;
+        let writer = zio::Writer::new(writer, decoder);
+        Ok(Decoder { writer })
+    }
+}
+
+impl<'a, W: Write> Decoder<'a, W> {
+    /// Creates a new decoder, using an existing prepared `DecoderDictionary`.
+    ///
+    /// (Provides better compression ratio for small files,
+    /// but requires the dictionary to be present during decompression.)
+    pub fn with_prepared_dictionary<'b>(
+        writer: W,
+        dictionary: &DecoderDictionary<'b>,
+    ) -> io::Result<Self>
+    where
+        'b: 'a,
+    {
+        let decoder = raw::Decoder::with_prepared_dictionary(dictionary)?;
+        let writer = zio::Writer::new(writer, decoder);
+        Ok(Decoder { writer })
+    }
+
+    /// Acquires a reference to the underlying writer.
+    pub fn get_ref(&self) -> &W {
+        self.writer.writer()
+    }
+
+    /// Acquires a mutable reference to the underlying writer.
+    ///
+    /// Note that mutation of the writer may result in surprising results if
+    /// this decoder is continued to be used.
+    pub fn get_mut(&mut self) -> &mut W {
+        self.writer.writer_mut()
+    }
+
+    /// Returns the inner `Write`.
+    pub fn into_inner(self) -> W {
+        self.writer.into_inner().0
+    }
+
+    /// Return a recommendation for the size of data to write at once.
+    pub fn recommended_input_size() -> usize {
+        zstd_safe::DCtx::in_size()
+    }
+
+    /// Returns a wrapper around `self` that will flush the stream on drop.
+    ///
+    /// # Panic
+    ///
+    /// Panics on drop if an error happens when flushing the stream.
+    pub fn auto_flush(self) -> AutoFlushDecoder<'a, W> {
+        self.on_flush(Box::new(|result| {
+            result.unwrap();
+        }))
+    }
+
+    /// Returns a decoder that will flush the stream on drop.
+    ///
+    /// Calls the given callback with the result from `flush()`.
+    pub fn on_flush<F: FnMut(io::Result<()>)>(
+        self,
+        f: F,
+    ) -> AutoFlushDecoder<'a, W, F> {
+        AutoFlushDecoder::new(self, f)
+    }
+
+    crate::decoder_common!(writer);
+}
+
+impl<W: Write> Write for Decoder<'_, W> {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.writer.write(buf)
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.writer.flush()
+    }
+}
+
+fn _assert_traits() {
+    fn _assert_send<T: Send>(_: T) {}
+
+    _assert_send(Decoder::new(Vec::new()));
+    _assert_send(Encoder::new(Vec::new(), 1));
+    _assert_send(Decoder::new(Vec::new()).unwrap().auto_flush());
+    _assert_send(Encoder::new(Vec::new(), 1).unwrap().auto_finish());
+}
diff --git a/src/stream/write/tests.rs b/src/stream/write/tests.rs
new file mode 100644 (file)
index 0000000..dee1350
--- /dev/null
@@ -0,0 +1,72 @@
+use std::io::{Cursor, Write};
+use std::iter;
+
+use partial_io::{PartialOp, PartialWrite};
+
+use crate::stream::decode_all;
+use crate::stream::write::{Decoder, Encoder};
+
+#[test]
+fn test_cycle() {
+    let input = b"Abcdefghabcdefgh";
+
+    let buffer = Cursor::new(Vec::new());
+    let mut encoder = Encoder::new(buffer, 1).unwrap();
+    encoder.write_all(input).unwrap();
+    let encoded = encoder.finish().unwrap().into_inner();
+
+    // println!("Encoded: {:?}", encoded);
+
+    let buffer = Cursor::new(Vec::new());
+    let mut decoder = Decoder::new(buffer).unwrap();
+    decoder.write_all(&encoded).unwrap();
+    decoder.flush().unwrap();
+    let decoded = decoder.into_inner().into_inner();
+
+    assert_eq!(input, &decoded[..]);
+}
+
+/// Test that flush after a partial write works successfully without
+/// corrupting the frame. This test is in this module because it checks
+/// internal implementation details.
+#[test]
+fn test_partial_write_flush() {
+    let input = vec![b'b'; 128 * 1024];
+    let mut z = setup_partial_write(&input);
+
+    // flush shouldn't corrupt the stream
+    z.flush().unwrap();
+
+    let buf = z.finish().unwrap().into_inner();
+    assert_eq!(&decode_all(&buf[..]).unwrap(), &input);
+}
+
+/// Test that finish after a partial write works successfully without
+/// corrupting the frame. This test is in this module because it checks
+/// internal implementation details.
+#[test]
+fn test_partial_write_finish() {
+    let input = vec![b'b'; 128 * 1024];
+    let z = setup_partial_write(&input);
+
+    // finish shouldn't corrupt the stream
+    let buf = z.finish().unwrap().into_inner();
+    assert_eq!(&decode_all(&buf[..]).unwrap(), &input);
+}
+
+fn setup_partial_write(input_data: &[u8]) -> Encoder<PartialWrite<Vec<u8>>> {
+    let buf =
+        PartialWrite::new(Vec::new(), iter::repeat(PartialOp::Limited(1)));
+    let mut z = Encoder::new(buf, 1).unwrap();
+
+    // Fill in enough data to make sure the buffer gets written out.
+    z.write(input_data).unwrap();
+
+    {
+        let inner = &mut z.writer;
+        // At this point, the internal buffer in z should have some data.
+        assert_ne!(inner.offset(), inner.buffer().len());
+    }
+
+    z
+}
diff --git a/src/stream/zio/mod.rs b/src/stream/zio/mod.rs
new file mode 100644 (file)
index 0000000..2ceef82
--- /dev/null
@@ -0,0 +1,7 @@
+//! Wrappers around raw operations implementing `std::io::{Read, Write}`.
+
+mod reader;
+mod writer;
+
+pub use self::reader::Reader;
+pub use self::writer::Writer;
diff --git a/src/stream/zio/reader.rs b/src/stream/zio/reader.rs
new file mode 100644 (file)
index 0000000..4214bbd
--- /dev/null
@@ -0,0 +1,241 @@
+use std::io::{self, BufRead, Read};
+
+use crate::stream::raw::{InBuffer, Operation, OutBuffer};
+
+// [ reader -> zstd ] -> output
+/// Implements the [`Read`] API around an [`Operation`].
+///
+/// This can be used to wrap a raw in-memory operation in a read-focused API.
+///
+/// It can wrap either a compression or decompression operation, and pulls
+/// input data from a wrapped `Read`.
+pub struct Reader<R, D> {
+    reader: R,
+    operation: D,
+
+    state: State,
+
+    single_frame: bool,
+    finished_frame: bool,
+}
+
+enum State {
+    // Still actively reading from the inner `Read`
+    Reading,
+    // We reached EOF from the inner `Read`, now flushing.
+    PastEof,
+    // We are fully done, nothing can be read.
+    Finished,
+}
+
+impl<R, D> Reader<R, D> {
+    /// Creates a new `Reader`.
+    ///
+    /// `reader` will be used to pull input data for the given operation.
+    pub fn new(reader: R, operation: D) -> Self {
+        Reader {
+            reader,
+            operation,
+            state: State::Reading,
+            single_frame: false,
+            finished_frame: false,
+        }
+    }
+
+    /// Sets `self` to stop after the first decoded frame.
+    pub fn set_single_frame(&mut self) {
+        self.single_frame = true;
+    }
+
+    /// Returns a mutable reference to the underlying operation.
+    pub fn operation_mut(&mut self) -> &mut D {
+        &mut self.operation
+    }
+
+    /// Returns a mutable reference to the underlying reader.
+    pub fn reader_mut(&mut self) -> &mut R {
+        &mut self.reader
+    }
+
+    /// Returns a reference to the underlying reader.
+    pub fn reader(&self) -> &R {
+        &self.reader
+    }
+
+    /// Returns the inner reader.
+    pub fn into_inner(self) -> R {
+        self.reader
+    }
+
+    /// Flush any internal buffer.
+    ///
+    /// For encoders, this ensures all input consumed so far is compressed.
+    pub fn flush(&mut self, output: &mut [u8]) -> io::Result<usize>
+    where
+        D: Operation,
+    {
+        self.operation.flush(&mut OutBuffer::around(output))
+    }
+}
+// Read and retry on Interrupted errors.
+fn fill_buf<R>(reader: &mut R) -> io::Result<&[u8]>
+where
+    R: BufRead,
+{
+    // This doesn't work right now because of the borrow-checker.
+    // When it can be made to compile, it would allow Reader to automatically
+    // retry on `Interrupted` error.
+    /*
+    loop {
+        match reader.fill_buf() {
+            Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+            otherwise => return otherwise,
+        }
+    }
+    */
+
+    // Workaround for now
+    let res = reader.fill_buf()?;
+
+    // eprintln!("Filled buffer: {:?}", res);
+
+    Ok(res)
+}
+
+impl<R, D> Read for Reader<R, D>
+where
+    R: BufRead,
+    D: Operation,
+{
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        // Keep trying until _something_ has been written.
+        let mut first = true;
+        loop {
+            match self.state {
+                State::Reading => {
+                    let (bytes_read, bytes_written) = {
+                        // Start with a fresh pool of un-processed data.
+                        // This is the only line that can return an interruption error.
+                        let input = if first {
+                            // eprintln!("First run, no input coming.");
+                            b""
+                        } else {
+                            fill_buf(&mut self.reader)?
+                        };
+
+                        // eprintln!("Input = {:?}", input);
+
+                        // It's possible we don't have any new data to read.
+                        // (In this case we may still have zstd's own buffer to clear.)
+                        if !first && input.is_empty() {
+                            self.state = State::PastEof;
+                            continue;
+                        }
+                        first = false;
+
+                        let mut src = InBuffer::around(input);
+                        let mut dst = OutBuffer::around(buf);
+
+                        // We don't want empty input (from first=true) to cause a frame
+                        // re-initialization.
+                        if self.finished_frame && !input.is_empty() {
+                            // eprintln!("!! Reigniting !!");
+                            self.operation.reinit()?;
+                            self.finished_frame = false;
+                        }
+
+                        // Phase 1: feed input to the operation
+                        let hint = self.operation.run(&mut src, &mut dst)?;
+                        // eprintln!(
+                        //     "Hint={} Just run our operation:\n In={:?}\n Out={:?}",
+                        //     hint, src, dst
+                        // );
+
+                        if hint == 0 {
+                            // In practice this only happens when decoding, when we just finished
+                            // reading a frame.
+                            self.finished_frame = true;
+                            if self.single_frame {
+                                self.state = State::Finished;
+                            }
+                        }
+
+                        // eprintln!("Output: {:?}", dst);
+
+                        (src.pos(), dst.pos())
+                    };
+
+                    self.reader.consume(bytes_read);
+
+                    if bytes_written > 0 {
+                        return Ok(bytes_written);
+                    }
+
+                    // We need more data! Try again!
+                }
+                State::PastEof => {
+                    let mut dst = OutBuffer::around(buf);
+
+                    // We already sent all the input we could get to zstd. Time to flush out the
+                    // buffer and be done with it.
+
+                    // Phase 2: flush out the operation's buffer
+                    // Keep calling `finish()` until the buffer is empty.
+                    let hint = self
+                        .operation
+                        .finish(&mut dst, self.finished_frame)?;
+                    // eprintln!("Hint: {} ; Output: {:?}", hint, dst);
+                    if hint == 0 {
+                        // This indicates that the footer is complete.
+                        // This is the only way to terminate the stream cleanly.
+                        self.state = State::Finished;
+                    }
+
+                    return Ok(dst.pos());
+                }
+                State::Finished => {
+                    return Ok(0);
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::Reader;
+    use std::io::{Cursor, Read};
+
+    #[test]
+    fn test_noop() {
+        use crate::stream::raw::NoOp;
+
+        let input = b"AbcdefghAbcdefgh.";
+
+        // Test reader
+        let mut output = Vec::new();
+        {
+            let mut reader = Reader::new(Cursor::new(input), NoOp);
+            reader.read_to_end(&mut output).unwrap();
+        }
+        assert_eq!(&output, input);
+    }
+
+    #[test]
+    fn test_compress() {
+        use crate::stream::raw::Encoder;
+
+        let input = b"AbcdefghAbcdefgh.";
+
+        // Test reader
+        let mut output = Vec::new();
+        {
+            let mut reader =
+                Reader::new(Cursor::new(input), Encoder::new(1).unwrap());
+            reader.read_to_end(&mut output).unwrap();
+        }
+        // eprintln!("{:?}", output);
+        let decoded = crate::decode_all(&output[..]).unwrap();
+        assert_eq!(&decoded, input);
+    }
+}
diff --git a/src/stream/zio/writer.rs b/src/stream/zio/writer.rs
new file mode 100644 (file)
index 0000000..de1c53d
--- /dev/null
@@ -0,0 +1,296 @@
+use std::io::{self, Write};
+
+use crate::stream::raw::{InBuffer, Operation, OutBuffer};
+
+// input -> [ zstd -> buffer -> writer ]
+
+/// Implements the [`Write`] API around an [`Operation`].
+///
+/// This can be used to wrap a raw in-memory operation in a write-focused API.
+///
+/// It can be used with either compression or decompression, and forwards the
+/// output to a wrapped `Write`.
+pub struct Writer<W, D> {
+    writer: W,
+    operation: D,
+
+    offset: usize,
+    buffer: Vec<u8>,
+
+    // When `true`, indicates that nothing should be added to the buffer.
+    // All that's left if to empty the buffer.
+    finished: bool,
+
+    finished_frame: bool,
+}
+
+impl<W, D> Writer<W, D>
+where
+    W: Write,
+    D: Operation,
+{
+    /// Creates a new `Writer`.
+    ///
+    /// All output from the given operation will be forwarded to `writer`.
+    pub fn new(writer: W, operation: D) -> Self {
+        Writer {
+            writer,
+            operation,
+
+            offset: 0,
+            // 32KB buffer? That's what flate2 uses
+            buffer: Vec::with_capacity(32 * 1024),
+
+            finished: false,
+            finished_frame: false,
+        }
+    }
+
+    /// Ends the stream.
+    ///
+    /// This *must* be called after all data has been written to finish the
+    /// stream.
+    ///
+    /// If you forget to call this and just drop the `Writer`, you *will* have
+    /// an incomplete output.
+    ///
+    /// Keep calling it until it returns `Ok(())`, then don't call it again.
+    pub fn finish(&mut self) -> io::Result<()> {
+        loop {
+            // Keep trying until we're really done.
+            self.write_from_offset()?;
+
+            // At this point the buffer has been fully written out.
+
+            if self.finished {
+                return Ok(());
+            }
+
+            // Let's fill this buffer again!
+
+            let finished_frame = self.finished_frame;
+            let hint =
+                self.with_buffer(|dst, op| op.finish(dst, finished_frame));
+            self.offset = 0;
+            // println!("Hint: {:?}\nOut:{:?}", hint, &self.buffer);
+
+            // We return here if zstd had a problem.
+            // Could happen with invalid data, ...
+            let hint = hint?;
+
+            if hint != 0 && self.buffer.is_empty() {
+                // This happens if we are decoding an incomplete frame.
+                return Err(io::Error::new(
+                    io::ErrorKind::UnexpectedEof,
+                    "incomplete frame",
+                ));
+            }
+
+            // println!("Finishing {}, {}", bytes_written, hint);
+
+            self.finished = hint == 0;
+        }
+    }
+
+    /// Run the given closure on `self.buffer`.
+    ///
+    /// The buffer will be cleared, and made available wrapped in an `OutBuffer`.
+    fn with_buffer<F, T>(&mut self, f: F) -> T
+    where
+        F: FnOnce(&mut OutBuffer<'_, Vec<u8>>, &mut D) -> T,
+    {
+        self.buffer.clear();
+        let mut output = OutBuffer::around(&mut self.buffer);
+        // eprintln!("Output: {:?}", output);
+        f(&mut output, &mut self.operation)
+    }
+
+    /// Attempt to write `self.buffer` to the wrapped writer.
+    ///
+    /// Returns `Ok(())` once all the buffer has been written.
+    fn write_from_offset(&mut self) -> io::Result<()> {
+        // The code looks a lot like `write_all`, but keeps track of what has
+        // been written in case we're interrupted.
+        while self.offset < self.buffer.len() {
+            match self.writer.write(&self.buffer[self.offset..]) {
+                Ok(0) => {
+                    return Err(io::Error::new(
+                        io::ErrorKind::WriteZero,
+                        "writer will not accept any more data",
+                    ))
+                }
+                Ok(n) => self.offset += n,
+                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => (),
+                Err(e) => return Err(e),
+            }
+        }
+        Ok(())
+    }
+
+    /// Return the wrapped `Writer` and `Operation`.
+    ///
+    /// Careful: if you call this before calling [`Writer::finish()`], the
+    /// output may be incomplete.
+    pub fn into_inner(self) -> (W, D) {
+        (self.writer, self.operation)
+    }
+
+    /// Gives a reference to the inner writer.
+    pub fn writer(&self) -> &W {
+        &self.writer
+    }
+
+    /// Gives a mutable reference to the inner writer.
+    pub fn writer_mut(&mut self) -> &mut W {
+        &mut self.writer
+    }
+
+    /// Gives a reference to the inner operation.
+    pub fn operation(&self) -> &D {
+        &self.operation
+    }
+
+    /// Gives a mutable reference to the inner operation.
+    pub fn operation_mut(&mut self) -> &mut D {
+        &mut self.operation
+    }
+
+    /// Returns the offset in the current buffer. Only useful for debugging.
+    #[cfg(test)]
+    pub fn offset(&self) -> usize {
+        self.offset
+    }
+
+    /// Returns the current buffer. Only useful for debugging.
+    #[cfg(test)]
+    pub fn buffer(&self) -> &[u8] {
+        &self.buffer
+    }
+}
+
+impl<W, D> Write for Writer<W, D>
+where
+    W: Write,
+    D: Operation,
+{
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        // Keep trying until _something_ has been consumed.
+        // As soon as some input has been taken, we cannot afford
+        // to take any chance: if an error occurs, the user couldn't know
+        // that some data _was_ successfully written.
+        loop {
+            // First, write any pending data from `self.buffer`.
+            self.write_from_offset()?;
+            // At this point `self.buffer` can safely be discarded.
+
+            // Support writing concatenated frames by re-initializing the
+            // context.
+            if self.finished_frame {
+                self.operation.reinit()?;
+                self.finished_frame = false;
+            }
+
+            let mut src = InBuffer::around(buf);
+            let hint = self.with_buffer(|dst, op| op.run(&mut src, dst));
+            let bytes_read = src.pos;
+
+            // eprintln!(
+            //     "Write Hint: {:?}\n src: {:?}\n dst: {:?}",
+            //     hint, src, self.buffer
+            // );
+
+            self.offset = 0;
+            let hint = hint?;
+
+            if hint == 0 {
+                self.finished_frame = true;
+            }
+
+            // As we said, as soon as we've consumed something, return.
+            if bytes_read > 0 || buf.is_empty() {
+                // println!("Returning {}", bytes_read);
+                return Ok(bytes_read);
+            }
+        }
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        let mut finished = self.finished;
+        loop {
+            // If the output is blocked or has an error, return now.
+            self.write_from_offset()?;
+
+            if finished {
+                break;
+            }
+
+            let hint = self.with_buffer(|dst, op| op.flush(dst));
+
+            self.offset = 0;
+            let hint = hint?;
+
+            finished = hint == 0;
+        }
+
+        self.writer.flush()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::Writer;
+    use std::io::Write;
+
+    #[test]
+    fn test_noop() {
+        use crate::stream::raw::NoOp;
+
+        let input = b"AbcdefghAbcdefgh.";
+
+        // Test writer
+        let mut output = Vec::new();
+        {
+            let mut writer = Writer::new(&mut output, NoOp);
+            writer.write_all(input).unwrap();
+            writer.finish().unwrap();
+        }
+        assert_eq!(&output, input);
+    }
+
+    #[test]
+    fn test_compress() {
+        use crate::stream::raw::Encoder;
+
+        let input = b"AbcdefghAbcdefgh.";
+
+        // Test writer
+        let mut output = Vec::new();
+        {
+            let mut writer =
+                Writer::new(&mut output, Encoder::new(1).unwrap());
+            writer.write_all(input).unwrap();
+            writer.finish().unwrap();
+        }
+        // println!("Output: {:?}", output);
+        let decoded = crate::decode_all(&output[..]).unwrap();
+        assert_eq!(&decoded, input);
+    }
+
+    #[test]
+    fn test_decompress() {
+        use crate::stream::raw::Decoder;
+
+        let input = b"AbcdefghAbcdefgh.";
+        let compressed = crate::encode_all(&input[..], 1).unwrap();
+
+        // Test writer
+        let mut output = Vec::new();
+        {
+            let mut writer = Writer::new(&mut output, Decoder::new().unwrap());
+            writer.write_all(&compressed).unwrap();
+            writer.finish().unwrap();
+        }
+        // println!("Output: {:?}", output);
+        assert_eq!(&output, input);
+    }
+}
diff --git a/tests/issue_182.rs b/tests/issue_182.rs
new file mode 100644 (file)
index 0000000..f22ef7b
--- /dev/null
@@ -0,0 +1,16 @@
+const TEXT: &[u8] = include_bytes!("../assets/example.txt");
+
+#[test]
+#[should_panic]
+fn test_issue_182() {
+    use std::io::BufRead;
+
+    let compressed = zstd::encode_all(TEXT, 3).unwrap();
+    let truncated = &compressed[..compressed.len() / 2];
+
+    let rdr = zstd::Decoder::new(truncated).unwrap();
+    let rdr = std::io::BufReader::new(rdr);
+    for line in rdr.lines() {
+        line.unwrap();
+    }
+}