zlib: streams2
authorisaacs <i@izs.me>
Tue, 2 Oct 2012 23:15:39 +0000 (16:15 -0700)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 18:52:26 +0000 (10:52 -0800)
lib/zlib.js
src/node_zlib.cc
test/simple/test-zlib-destroy.js [deleted file]
test/simple/test-zlib-invalid-input.js

index 9b56241..bc3e933 100644 (file)
 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 // USE OR OTHER DEALINGS IN THE SOFTWARE.
 
+var Transform = require('_stream_transform');
+
 var binding = process.binding('zlib');
 var util = require('util');
-var Stream = require('stream');
 var assert = require('assert').ok;
 
 // zlib doesn't provide these, so kludge them in following the same
@@ -138,33 +139,35 @@ function zlibBuffer(engine, buffer, callback) {
   var buffers = [];
   var nread = 0;
 
+  engine.on('error', onError);
+  engine.on('end', onEnd);
+
+  engine.end(buffer);
+  flow();
+
+  function flow() {
+    var chunk;
+    while (null !== (chunk = engine.read())) {
+      buffers.push(chunk);
+      nread += chunk.length;
+    }
+    engine.once('readable', flow);
+  }
+
   function onError(err) {
     engine.removeListener('end', onEnd);
-    engine.removeListener('error', onError);
+    engine.removeListener('readable', flow);
     callback(err);
   }
 
-  function onData(chunk) {
-    buffers.push(chunk);
-    nread += chunk.length;
-  }
-
   function onEnd() {
     var buf = Buffer.concat(buffers, nread);
     buffers = [];
     callback(null, buf);
   }
-
-  engine.on('error', onError);
-  engine.on('data', onData);
-  engine.on('end', onEnd);
-
-  engine.write(buffer);
-  engine.end();
 }
 
 
-
 // generic zlib
 // minimal 2-byte header
 function Deflate(opts) {
@@ -217,15 +220,13 @@ function Unzip(opts) {
 // you call the .write() method.
 
 function Zlib(opts, mode) {
-  Stream.call(this);
-
   this._opts = opts = opts || {};
-  this._queue = [];
-  this._processing = false;
-  this._ended = false;
-  this.readable = true;
-  this.writable = true;
-  this._flush = binding.Z_NO_FLUSH;
+  this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK;
+
+  Transform.call(this, opts);
+
+  // means a different thing there.
+  this._readableState.chunkSize = null;
 
   if (opts.chunkSize) {
     if (opts.chunkSize < exports.Z_MIN_CHUNK ||
@@ -274,13 +275,12 @@ function Zlib(opts, mode) {
   this._binding = new binding.Zlib(mode);
 
   var self = this;
+  this._hadError = false;
   this._binding.onerror = function(message, errno) {
     // there is no way to cleanly recover.
     // continuing only obscures problems.
     self._binding = null;
     self._hadError = true;
-    self._queue.length = 0;
-    self._processing = false;
 
     var error = new Error(message);
     error.errno = errno;
@@ -294,7 +294,6 @@ function Zlib(opts, mode) {
                      opts.strategy || exports.Z_DEFAULT_STRATEGY,
                      opts.dictionary);
 
-  this._chunkSize = opts.chunkSize || exports.Z_DEFAULT_CHUNK;
   this._buffer = new Buffer(this._chunkSize);
   this._offset = 0;
   this._closed = false;
@@ -302,59 +301,47 @@ function Zlib(opts, mode) {
   this.once('end', this.close);
 }
 
-util.inherits(Zlib, Stream);
-
-Zlib.prototype.write = function write(chunk, cb) {
-  if (this._hadError) return true;
-
-  if (this._ended) {
-    return this.emit('error', new Error('Cannot write after end'));
-  }
-
-  if (arguments.length === 1 && typeof chunk === 'function') {
-    cb = chunk;
-    chunk = null;
-  }
-
-  if (!chunk) {
-    chunk = null;
-  } else if (typeof chunk === 'string') {
-    chunk = new Buffer(chunk);
-  } else if (!Buffer.isBuffer(chunk)) {
-    return this.emit('error', new Error('Invalid argument'));
-  }
-
-
-  var empty = this._queue.length === 0;
-
-  this._queue.push([chunk, cb]);
-  this._process();
-  if (!empty) {
-    this._needDrain = true;
-  }
-  return empty;
-};
+util.inherits(Zlib, Transform);
 
 Zlib.prototype.reset = function reset() {
   return this._binding.reset();
 };
 
-Zlib.prototype.flush = function flush(cb) {
-  this._flush = binding.Z_SYNC_FLUSH;
-  return this.write(cb);
+Zlib.prototype._flush = function(output, callback) {
+  var rs = this._readableState;
+  var self = this;
+  this._transform(null, output, function(er) {
+    if (er)
+      return callback(er);
+
+    // now a weird thing happens... it could be that you called flush
+    // but everything had already actually been consumed, but it wasn't
+    // enough to get over the Readable class's lowWaterMark.
+    // In that case, we emit 'readable' now to make sure it's consumed.
+    if (rs.length &&
+        rs.length < rs.lowWaterMark &&
+        !rs.ended &&
+        rs.needReadable)
+      self.emit('readable');
+
+    callback();
+  });
 };
 
-Zlib.prototype.end = function end(chunk, cb) {
-  if (this._hadError) return true;
+Zlib.prototype.flush = function(callback) {
+  var ws = this._writableState;
+  var ts = this._transformState;
 
-  var self = this;
-  this._ending = true;
-  var ret = this.write(chunk, function() {
-    self.emit('end');
-    if (cb) cb();
-  });
-  this._ended = true;
-  return ret;
+  if (ws.writing) {
+    ws.needDrain = true;
+    var self = this;
+    this.once('drain', function() {
+      self._flush(ts.output, callback);
+    });
+    return;
+  }
+
+  this._flush(ts.output, callback || function() {});
 };
 
 Zlib.prototype.close = function(callback) {
@@ -368,37 +355,37 @@ Zlib.prototype.close = function(callback) {
 
   this._binding.close();
 
-  process.nextTick(this.emit.bind(this, 'close'));
+  var self = this;
+  process.nextTick(function() {
+    self.emit('close');
+  });
 };
 
-Zlib.prototype._process = function() {
-  if (this._hadError) return;
-
-  if (this._processing || this._paused) return;
-
-  if (this._queue.length === 0) {
-    if (this._needDrain) {
-      this._needDrain = false;
-      this.emit('drain');
-    }
-    // nothing to do, waiting for more data at this point.
-    return;
-  }
-
-  var req = this._queue.shift();
-  var cb = req.pop();
-  var chunk = req.pop();
-
-  if (this._ending && this._queue.length === 0) {
-    this._flush = binding.Z_FINISH;
-  }
+Zlib.prototype._transform = function(chunk, output, cb) {
+  var flushFlag;
+  var ws = this._writableState;
+  var ending = ws.ending || ws.ended;
+  var last = ending && (!chunk || ws.length === chunk.length);
+
+  if (chunk !== null && !Buffer.isBuffer(chunk))
+    return cb(new Error('invalid input'));
+
+  // If it's the last chunk, or a final flush, we use the Z_FINISH flush flag.
+  // If it's explicitly flushing at some other time, then we use
+  // Z_FULL_FLUSH. Otherwise, use Z_NO_FLUSH for maximum compression
+  // goodness.
+  if (last)
+    flushFlag = binding.Z_FINISH;
+  else if (chunk === null)
+    flushFlag = binding.Z_FULL_FLUSH;
+  else
+    flushFlag = binding.Z_NO_FLUSH;
 
-  var self = this;
   var availInBefore = chunk && chunk.length;
   var availOutBefore = this._chunkSize - this._offset;
-
   var inOff = 0;
-  var req = this._binding.write(this._flush,
+
+  var req = this._binding.write(flushFlag,
                                 chunk, // in
                                 inOff, // in_off
                                 availInBefore, // in_len
@@ -408,23 +395,23 @@ Zlib.prototype._process = function() {
 
   req.buffer = chunk;
   req.callback = callback;
-  this._processing = req;
 
+  var self = this;
   function callback(availInAfter, availOutAfter, buffer) {
-    if (self._hadError) return;
+    if (self._hadError)
+      return;
 
     var have = availOutBefore - availOutAfter;
-
     assert(have >= 0, 'have should not go down');
 
     if (have > 0) {
       var out = self._buffer.slice(self._offset, self._offset + have);
       self._offset += have;
-      self.emit('data', out);
+      // serve some output to the consumer.
+      output(out);
     }
 
-    // XXX Maybe have a 'min buffer' size so we don't dip into the
-    // thread pool with only 1 byte available or something?
+    // exhausted the output buffer, or used all the input create a new one.
     if (availOutAfter === 0 || self._offset >= self._chunkSize) {
       availOutBefore = self._chunkSize;
       self._offset = 0;
@@ -439,7 +426,7 @@ Zlib.prototype._process = function() {
       inOff += (availInBefore - availInAfter);
       availInBefore = availInAfter;
 
-      var newReq = self._binding.write(self._flush,
+      var newReq = self._binding.write(flushFlag,
                                        chunk,
                                        inOff,
                                        availInBefore,
@@ -448,34 +435,14 @@ Zlib.prototype._process = function() {
                                        self._chunkSize);
       newReq.callback = callback; // this same function
       newReq.buffer = chunk;
-      self._processing = newReq;
       return;
     }
 
     // finished with the chunk.
-    self._processing = false;
-    if (cb) cb();
-    self._process();
+    cb();
   }
 };
 
-Zlib.prototype.pause = function() {
-  this._paused = true;
-  this.emit('pause');
-};
-
-Zlib.prototype.resume = function() {
-  this._paused = false;
-  this._process();
-};
-
-Zlib.prototype.destroy = function() {
-  this.readable = false;
-  this.writable = false;
-  this._ended = true;
-  this.emit('close');
-};
-
 util.inherits(Deflate, Zlib);
 util.inherits(Inflate, Zlib);
 util.inherits(Gzip, Zlib);
index 13f94e9..881b20c 100644 (file)
@@ -109,7 +109,19 @@ class ZCtx : public ObjectWrap {
     assert(!ctx->write_in_progress_ && "write already in progress");
     ctx->write_in_progress_ = true;
 
+    assert(!args[0]->IsUndefined() && "must provide flush value");
+
     unsigned int flush = args[0]->Uint32Value();
+
+    if (flush != Z_NO_FLUSH &&
+        flush != Z_PARTIAL_FLUSH &&
+        flush != Z_SYNC_FLUSH &&
+        flush != Z_FULL_FLUSH &&
+        flush != Z_FINISH &&
+        flush != Z_BLOCK) {
+      assert(0 && "Invalid flush value");
+    }
+
     Bytef *in;
     Bytef *out;
     size_t in_off, in_len, out_off, out_len;
@@ -483,6 +495,7 @@ void InitZlib(Handle<Object> target) {
   callback_sym = NODE_PSYMBOL("callback");
   onerror_sym = NODE_PSYMBOL("onerror");
 
+  // valid flush values.
   NODE_DEFINE_CONSTANT(target, Z_NO_FLUSH);
   NODE_DEFINE_CONSTANT(target, Z_PARTIAL_FLUSH);
   NODE_DEFINE_CONSTANT(target, Z_SYNC_FLUSH);
diff --git a/test/simple/test-zlib-destroy.js b/test/simple/test-zlib-destroy.js
deleted file mode 100644 (file)
index 7a1120e..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-// Copyright Joyent, Inc. and other Node contributors.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a
-// copy of this software and associated documentation files (the
-// "Software"), to deal in the Software without restriction, including
-// without limitation the rights to use, copy, modify, merge, publish,
-// distribute, sublicense, and/or sell copies of the Software, and to permit
-// persons to whom the Software is furnished to do so, subject to the
-// following conditions:
-//
-// The above copyright notice and this permission notice shall be included
-// in all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
-// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
-// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
-// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
-// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
-// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
-// USE OR OTHER DEALINGS IN THE SOFTWARE.
-
-var common = require('../common');
-var assert = require('assert');
-var zlib = require('zlib');
-
-['Deflate', 'Inflate', 'Gzip', 'Gunzip', 'DeflateRaw', 'InflateRaw', 'Unzip']
-  .forEach(function (name) {
-    var a = false;
-    var zStream = new zlib[name]();
-    zStream.on('close', function () {
-      a = true;
-    });
-    zStream.destroy();
-
-    assert.equal(a, true, name+'#destroy() must emit \'close\'');
-  });
index f97c583..c3d8b5b 100644 (file)
@@ -50,13 +50,6 @@ unzips.forEach(function (uz, i) {
   uz.on('error', function(er) {
     console.error('Error event', er);
     hadError[i] = true;
-
-    // to be friendly to the Stream API, zlib objects just return true and
-    // ignore data on the floor after an error.  It's up to the user to
-    // catch the 'error' event and do something intelligent.  They do not
-    // emit any more data, however.
-    assert.equal(uz.write('also invalid'), true);
-    assert.equal(uz.end(), true);
   });
 
   uz.on('end', function(er) {