doc: Provide 2 examples of SimpleProtocol parser
authorisaacs <i@izs.me>
Thu, 28 Feb 2013 23:42:55 +0000 (15:42 -0800)
committerisaacs <i@izs.me>
Fri, 1 Mar 2013 01:38:17 +0000 (17:38 -0800)
The first example uses Readable, and shows the use of
readable.unshift().  The second uses the Transform class, showing that
it's much simpler in this case.

doc/api/stream.markdown

index ebb771dd5591fd53b9358d7a4f90bcb1a350ffc3..86f45ca9f9f8ed36276eccb858c6e58e8ec678d4 100644 (file)
@@ -84,7 +84,7 @@ method.
 A `Readable Stream` has the following methods, members, and events.
 
 Note that `stream.Readable` is an abstract class designed to be
-extended with an underlying implementation of the `_read(size, cb)`
+extended with an underlying implementation of the `_read(size)`
 method. (See below.)
 
 ### new stream.Readable([options])
@@ -105,32 +105,39 @@ In classes that extend the Readable class, make sure to call the
 constructor so that the buffering settings can be properly
 initialized.
 
-### readable.\_read(size, callback)
+### readable.\_read(size)
 
 * `size` {Number} Number of bytes to read asynchronously
 * `callback` {Function} Called with an error or with data
 
-All Readable stream implementations must provide a `_read` method
-to fetch data from the underlying resource.
-
-Note: **This function MUST NOT be called directly.**  It should be
+Note: **This function should NOT be called directly.**  It should be
 implemented by child classes, and called by the internal Readable
 class methods only.
 
-Call the callback using the standard `callback(error, data)` pattern.
-When no more data can be fetched, call `callback(null, null)` to
-signal the EOF.
+All Readable stream implementations must provide a `_read` method
+to fetch data from the underlying resource.
 
 This method is prefixed with an underscore because it is internal to
 the class that defines it, and should not be called directly by user
 programs.  However, you **are** expected to override this method in
 your own extension classes.
 
+When data is available, put it into the read queue by calling
+`readable.push(chunk)`.  If `push` returns false, then you should stop
+reading.  When `_read` is called again, you should start pushing more
+data.
+
 ### readable.push(chunk)
 
 * `chunk` {Buffer | null | String} Chunk of data to push into the read queue
 * return {Boolean} Whether or not more pushes should be performed
 
+Note: **This function should be called by Readable implementors, NOT
+by consumers of Readable subclasses.**  The `_read()` function will not
+be called again until at least one `push(chunk)` call is made.  If no
+data is available, then you MAY call `push('')` (an empty string) to
+allow a future `_read` call, without adding any data to the queue.
+
 The `Readable` class works by putting data into a read queue to be
 pulled out later by calling the `read()` method when the `'readable'`
 event fires.
@@ -167,6 +174,115 @@ stream._read = function(size, cb) {
 };
 ```
 
+### readable.unshift(chunk)
+
+* `chunk` {Buffer | null | String} Chunk of data to unshift onto the read queue
+* return {Boolean} Whether or not more pushes should be performed
+
+This is the corollary of `readable.push(chunk)`.  Rather than putting
+the data at the *end* of the read queue, it puts it at the *front* of
+the read queue.
+
+This is useful in certain use-cases where a stream is being consumed
+by a parser, which needs to "un-consume" some data that it has
+optimistically pulled out of the source.
+
+```javascript
+// A parser for a simple data protocol.
+// The "header" is a JSON object, followed by 2 \n characters, and
+// then a message body.
+//
+// Note: This can be done more simply as a Transform stream.  See below.
+
+function SimpleProtocol(source, options) {
+  if (!(this instanceof SimpleProtocol))
+    return new SimpleProtocol(options);
+
+  Readable.call(this, options);
+  this._inBody = false;
+  this._sawFirstCr = false;
+
+  // source is a readable stream, such as a socket or file
+  this._source = source;
+
+  var self = this;
+  source.on('end', function() {
+    self.push(null);
+  });
+
+  // give it a kick whenever the source is readable
+  // read(0) will not consume any bytes
+  source.on('readable', function() {
+    self.read(0);
+  });
+
+  this._rawHeader = [];
+  this.header = null;
+}
+
+SimpleProtocol.prototype = Object.create(
+  Readable.prototype, { constructor: { value: SimpleProtocol }});
+
+SimpleProtocol.prototype._read = function(n) {
+  if (!this._inBody) {
+    var chunk = this._source.read();
+
+    // if the source doesn't have data, we don't have data yet.
+    if (chunk === null)
+      return this.push('');
+
+    // check if the chunk has a \n\n
+    var split = -1;
+    for (var i = 0; i < chunk.length; i++) {
+      if (chunk[i] === 10) { // '\n'
+        if (this._sawFirstCr) {
+          split = i;
+          break;
+        } else {
+          this._sawFirstCr = true;
+        }
+      } else {
+        this._sawFirstCr = false;
+      }
+    }
+
+    if (split === -1) {
+      // still waiting for the \n\n
+      // stash the chunk, and try again.
+      this._rawHeader.push(chunk);
+      this.push('');
+    } else {
+      this._inBody = true;
+      var h = chunk.slice(0, split);
+      this._rawHeader.push(h);
+      var header = Buffer.concat(this._rawHeader).toString();
+      try {
+        this.header = JSON.parse(header);
+      } catch (er) {
+        this.emit('error', new Error('invalid simple protocol data'));
+        return;
+      }
+      // now, because we got some extra data, unshift the rest
+      // back into the read queue so that our consumer will see it.
+      this.unshift(b);
+
+      // and let them know that we are done parsing the header.
+      this.emit('header', this.header);
+    }
+  } else {
+    // from there on, just provide the data to our consumer.
+    // careful not to push(null), since that would indicate EOF.
+    var chunk = this._source.read();
+    if (chunk) this.push(chunk);
+  }
+};
+
+// Usage:
+var parser = new SimpleProtocol(source);
+// Now parser is a readable stream that will emit 'header'
+// with the parsed header data.
+```
+
 ### readable.wrap(stream)
 
 * `stream` {Stream} An "old style" readable stream
@@ -232,6 +348,8 @@ constructor.
 * `size` {Number | null} Optional number of bytes to read.
 * Return: {Buffer | String | null}
 
+Note: **This function SHOULD be called by Readable stream users.**
+
 Call this method to consume data once the `'readable'` event is
 emitted.
 
@@ -243,8 +361,8 @@ If there is no data to consume, or if there are fewer bytes in the
 internal buffer than the `size` argument, then `null` is returned, and
 a future `'readable'` event will be emitted when more is available.
 
-Note that calling `stream.read(0)` will always return `null`, and will
-trigger a refresh of the internal buffer, but otherwise be a no-op.
+Calling `stream.read(0)` will always return `null`, and will trigger a
+refresh of the internal buffer, but otherwise be a no-op.
 
 ### readable.pipe(destination, [options])
 
@@ -416,14 +534,14 @@ A "duplex" stream is one that is both Readable and Writable, such as a
 TCP socket connection.
 
 Note that `stream.Duplex` is an abstract class designed to be
-extended with an underlying implementation of the `_read(size, cb)`
+extended with an underlying implementation of the `_read(size)`
 and `_write(chunk, callback)` methods as you would with a Readable or
 Writable stream class.
 
 Since JavaScript doesn't have multiple prototypal inheritance, this
 class prototypally inherits from Readable, and then parasitically from
 Writable.  It is thus up to the user to implement both the lowlevel
-`_read(n,cb)` method as well as the lowlevel `_write(chunk,cb)` method
+`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method
 on extension duplex classes.
 
 ### new stream.Duplex(options)
@@ -471,13 +589,13 @@ initialized.
 * `callback` {Function} Call this function (optionally with an error
   argument) when you are done processing the supplied chunk.
 
-All Transform stream implementations must provide a `_transform`
-method to accept input and produce output.
-
 Note: **This function MUST NOT be called directly.**  It should be
 implemented by child classes, and called by the internal Transform
 class methods only.
 
+All Transform stream implementations must provide a `_transform`
+method to accept input and produce output.
+
 `_transform` should do whatever has to be done in this specific
 Transform class, to handle the bytes being written, and pass them off
 to the readable portion of the interface.  Do asynchronous I/O,
@@ -521,6 +639,82 @@ the class that defines it, and should not be called directly by user
 programs.  However, you **are** expected to override this method in
 your own extension classes.
 
+### Example: `SimpleProtocol` parser
+
+The example above of a simple protocol parser can be implemented much
+more simply by using the higher level `Transform` stream class.
+
+In this example, rather than providing the input as an argument, it
+would be piped into the parser, which is a more idiomatic Node stream
+approach.
+
+```javascript
+function SimpleProtocol(options) {
+  if (!(this instanceof SimpleProtocol))
+    return new SimpleProtocol(options);
+
+  Transform.call(this, options);
+  this._inBody = false;
+  this._sawFirstCr = false;
+  this._rawHeader = [];
+  this.header = null;
+}
+
+SimpleProtocol.prototype = Object.create(
+  Transform.prototype, { constructor: { value: SimpleProtocol }});
+
+SimpleProtocol.prototype._transform = function(chunk, output, done) {
+  if (!this._inBody) {
+    // check if the chunk has a \n\n
+    var split = -1;
+    for (var i = 0; i < chunk.length; i++) {
+      if (chunk[i] === 10) { // '\n'
+        if (this._sawFirstCr) {
+          split = i;
+          break;
+        } else {
+          this._sawFirstCr = true;
+        }
+      } else {
+        this._sawFirstCr = false;
+      }
+    }
+
+    if (split === -1) {
+      // still waiting for the \n\n
+      // stash the chunk, and try again.
+      this._rawHeader.push(chunk);
+    } else {
+      this._inBody = true;
+      var h = chunk.slice(0, split);
+      this._rawHeader.push(h);
+      var header = Buffer.concat(this._rawHeader).toString();
+      try {
+        this.header = JSON.parse(header);
+      } catch (er) {
+        this.emit('error', new Error('invalid simple protocol data'));
+        return;
+      }
+      // and let them know that we are done parsing the header.
+      this.emit('header', this.header);
+
+      // now, because we got some extra data, emit this first.
+      output(b);
+    }
+  } else {
+    // from there on, just provide the data to our consumer as-is.
+    output(b);
+  }
+  done();
+};
+
+var parser = new SimpleProtocol();
+source.pipe(parser)
+
+// Now parser is a readable stream that will emit 'header'
+// with the parsed header data.
+```
+
 
 ## Class: stream.PassThrough