--- /dev/null
+The [`datapipes`](https://github.com/pytorch/pytorch/tree/master/torch/utils/data/datapipes) folder holds the implementation of the `IterDataPipe` and `MapDataPipe`.
+
+This document serves as an entry point for DataPipe implementation.
+
+## Implementing DataPipe
+For the sake of an example, let us implement an `IterDataPipe` to apply a callable over data under [`iter`](https://github.com/pytorch/pytorch/tree/master/torch/utils/data/datapipes/iter).
+For `MapDataPipe`, please take reference from files in [map](https://github.com/pytorch/pytorch/tree/master/torch/utils/data/datapipes/map) folder and implement the corresponding `__getitem__` method.
+
+### Naming
+The naming convention for DataPipe is Operation-er and with suffix of `IterDataPipe` because each DataPipe behaves like a container to apply the operation to data yielded from the source DataPipe.
+And, when importing the DataPipe into `iter` module under `datapipes`, each DataPipe will be aliased as Op-er without the suffix of `IterDataPipe`.
+Please check [`__init__.py`](https://github.com/pytorch/pytorch/blob/master/torch/utils/data/datapipes/iter/__init__.py) in `iter` module for how we aliasing each DataPipe class.
+Like the example of `IterDataPipe` to map a function, we are going to name it as `MapperIterDataPipe` and alias it as `iter.Mapper` under `datapipes`.
+
+### Constructor
+As DataSet now constructed by a stack of DataPipe-s, each DataPipe normally takes a source DataPipe as the first argument.
+```py
+class MapperIterDataPipe(IterDataPipe):
+ def __init__(self, dp, fn):
+ super().__init__()
+ self.dp = dp
+ self.fn = fn
+```
+Note: Avoid loading data from the source DataPipe in `__init__` function, in order to support lazy data loading and save memory.
+
+### Iterator
+For `IterDataPipe`, an `__iter__` function is needed to consume data from the source `IterDataPipe` then apply operation over the data before yield.
+```py
+class MapperIterDataPipe(IterDataPipe):
+ ...
+
+ def __iter__(self):
+ for d in self.dp:
+ yield self.fn(d)
+```
+
+### Length
+In the most common cases, as the example of `MapperIterDataPipe` above, the `__len__` method of DataPipe should return the length of source DataPipe.
+```py
+class MapperIterDataPipe(IterDataPipe):
+ ...
+
+ def __len__(self):
+ return len(self.dp)
+```
+Note that `__len__` method is optional for `IterDataPipe`.
+Like `CSVParserIterDataPipe` in the [Using DataPipe sector](#using-datapipe), `__len__` is not implemented because the size of each file streams is unknown for us before loading it.
+
+Besides, in some special cases, `__len__` method can be provided, but it would either return an integer length or raise Error depending on the arguments of DataPipe.
+And, the Error is required to be `TypeError` to support Python's build-in functions like `list(dp)`.
+Please check NOTE [ Lack of Default `__len__` in Python Abstract Base Classes ] for detailed reason in PyTorch.
+
+### Registering DataPipe with functional API
+Each DataPipe can be registered to support functional API using the decorator `functional_datapipe`.
+```py
+@functional_datapipe("map")
+class MapperIterDataPipe(IterDataPipe):
+ ...
+```
+Then, the stack of DataPipe can be constructed in functional-programming manner.
+```py
+>>> import torch.utils.data.datapipes as dp
+>>> datapipes1 = dp.iter.FileLoader(['a.file', 'b.file']).map(fn=decoder).shuffle().batch(2)
+
+>>> datapipes2 = dp.iter.FileLoader(['a.file', 'b.file'])
+>>> datapipes2 = dp.iter.Mapper(datapipes2)
+>>> datapipes2 = dp.iter.Shuffler(datapipes2)
+>>> datapipes2 = dp.iter.Batcher(datapipes2, 2)
+```
+In the above example, `datapipes1` and `datapipes2` represent the exact same stack of `IterDataPipe`-s.
+
+## Using DataPipe
+For example, we want to load data from CSV files with the following data pipeline:
+- List all csv files
+- Load csv files
+- Parse csv file and yield rows
+
+To support the above pipeline, `CSVParser` is registered as `parse_csv_files` to consume file streams and expand them as rows.
+```py
+@functional_datapipe("parse_csv_files")
+class CSVParserIterDataPipe(IterDataPipe):
+ def __init__(self, dp, **fmtparams):
+ self.dp = dp
+ self.fmtparams = fmtparams
+
+ def __iter__(self):
+ for filename, stream in self.dp:
+ reader = csv.reader(stream, **self.fmtparams)
+ for row in reader:
+ yield filename, row
+```
+Then, the pipeline can be assembled as following:
+```py
+>>> import torch.utils.data.datapipes as dp
+
+>>> FOLDER = 'path/2/csv/folder'
+>>> datapipe = dp.iter.FileLister([FOLDER]).filter(fn=lambda filename: filename.endswith('.csv'))
+>>> datapipe = dp.iter.FileLoader(datapipe, mode='rt')
+>>> datapipe = datapipe.parse_csv_files(delimiter=' ')
+
+>>> for d in datapipe: # Start loading data
+... pass
+```
+++ /dev/null
-{
- "metadata": {
- "language_info": {
- "codemirror_mode": {
- "name": "ipython",
- "version": 3
- },
- "file_extension": ".py",
- "mimetype": "text/x-python",
- "name": "python",
- "nbconvert_exporter": "python",
- "pygments_lexer": "ipython3",
- "version": "3.6.10"
- },
- "orig_nbformat": 2,
- "kernelspec": {
- "name": "python3610jvsc74a57bd0eb5e09632d6ea1cbf3eb9da7e37b7cf581db5ed13074b21cc44e159dc62acdab",
- "display_name": "Python 3.6.10 64-bit ('dataloader': conda)"
- }
- },
- "nbformat": 4,
- "nbformat_minor": 2,
- "cells": [
- {
- "source": [
- "## DataPipes development tutorial. Loaders DataPipes."
- ],
- "cell_type": "markdown",
- "metadata": {}
- },
- {
- "source": [
- "As DataSet now constructed by stacking `DataPipe`-s it is recommended to keep `DataPipe` functionality as primitive as possible. For example loading data from CSV file will look like sequence of DataPipes: ListFiles FileLoader CSVParser.\n",
- "\n"
- ],
- "cell_type": "markdown",
- "metadata": {}
- },
- {
- "source": [
- "`ExampleListFilesDataPipe` scans all files in `root` folder and yields full file names. Avoid loading entire list in `__init__` function to save memory."
- ],
- "cell_type": "markdown",
- "metadata": {}
- },
- {
- "cell_type": "code",
- "execution_count": 1,
- "metadata": {},
- "outputs": [],
- "source": [
- "import csv\n",
- "import io\n",
- "import os\n",
- "\n",
- "from torch.utils.data import IterDataPipe, functional_datapipe\n",
- "\n",
- "\n",
- "class ExampleListFilesDataPipe(IterDataPipe):\n",
- " def __init__(self, *, root):\n",
- " self.root = root\n",
- "\n",
- " def __iter__(self):\n",
- " for (dirpath, dirnames, filenames) in os.walk(self.root):\n",
- " for file_name in filenames:\n",
- " yield os.path.join(dirpath, file_name)"
- ]
- },
- {
- "source": [
- "`ExampleFileLoaderDataPipe` registered as `load_files_as_string` consumes file names from source_datapipe and yields file names and file lines."
- ],
- "cell_type": "markdown",
- "metadata": {}
- },
- {
- "cell_type": "code",
- "execution_count": 2,
- "metadata": {},
- "outputs": [],
- "source": [
- "@functional_datapipe('load_files_as_string')\n",
- "class ExampleFileLoaderDataPipe(IterDataPipe):\n",
- " def __init__(self, source_datapipe):\n",
- " self.source_datapipe = source_datapipe\n",
- "\n",
- " def __iter__(self):\n",
- " for file_name in self.source_datapipe:\n",
- " with open(file_name) as file:\n",
- " lines = file.read()\n",
- " yield (file_name, lines)\n"
- ]
- },
- {
- "source": [
- "`ExampleCSVParserDataPipe` registered as `parse_csv_files` consumes file lines and expands them as CSV rows."
- ],
- "cell_type": "markdown",
- "metadata": {}
- },
- {
- "cell_type": "code",
- "execution_count": 3,
- "metadata": {},
- "outputs": [],
- "source": [
- "@functional_datapipe('parse_csv_files')\n",
- "class ExampleCSVParserDataPipe(IterDataPipe):\n",
- " def __init__(self, source_datapipe):\n",
- " self.source_datapipe = source_datapipe\n",
- "\n",
- " def __iter__(self):\n",
- " for file_name, lines in self.source_datapipe:\n",
- " reader = csv.reader(io.StringIO(lines))\n",
- " for row in reader:\n",
- " yield [file_name] + row\n"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 4,
- "metadata": {},
- "outputs": [
- {
- "output_type": "stream",
- "name": "stdout",
- "text": [
- "['/home/vitaly/dataset/data/datapipes/load/iter/test/example_2.csv', '10', \" 'foo'\"]\n['/home/vitaly/dataset/data/datapipes/load/iter/test/example_2.csv', '11', \" 'bar'\"]\n['/home/vitaly/dataset/data/datapipes/load/iter/test/example_1.csv', '12', \" 'aaaa'\"]\n['/home/vitaly/dataset/data/datapipes/load/iter/test/example_1.csv', '13', \" 'bbbb'\"]\n"
- ]
- }
- ],
- "source": [
- "FOLDER = 'define your folder with csv files here'\n",
- "FOLDER = '/home/vitaly/dataset/data'\n",
- "dp = ExampleListFilesDataPipe(root = FOLDER).filter(lambda filename: filename.endswith('.csv')).load_files_as_string().parse_csv_files()\n",
- "\n",
- "for data in dp:\n",
- " print(data)"
- ]
- },
- {
- "source": [
- "This approach allows to replace any DataPipe to get different functionality. For example you can pick individual files.\n"
- ],
- "cell_type": "markdown",
- "metadata": {}
- },
- {
- "cell_type": "code",
- "execution_count": 5,
- "metadata": {},
- "outputs": [
- {
- "output_type": "stream",
- "name": "stdout",
- "text": [
- "['/home/vitaly/dataset/data/datapipes/load/iter/test/example_1.csv', '12', \" 'aaaa'\"]\n['/home/vitaly/dataset/data/datapipes/load/iter/test/example_1.csv', '13', \" 'bbbb'\"]\n"
- ]
- }
- ],
- "source": [
- "FILE = 'define your file with csv data here'\n",
- "FILE = '/home/vitaly/dataset/data/datapipes/load/iter/test/example_1.csv'\n",
- "dp = ExampleFileLoaderDataPipe([FILE]).parse_csv_files()\n",
- "\n",
- "for data in dp:\n",
- " print(data)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": []
- }
- ]
-}
\ No newline at end of file