fs: streams2
authorisaacs <i@izs.me>
Fri, 5 Oct 2012 00:44:48 +0000 (17:44 -0700)
committerisaacs <i@izs.me>
Fri, 14 Dec 2012 18:52:26 +0000 (10:52 -0800)
lib/fs.js
test/simple/test-file-write-stream.js
test/simple/test-file-write-stream2.js
test/simple/test-fs-read-stream.js

index 83bacc9..96af631 100644 (file)
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -34,6 +34,9 @@ var fs = exports;
 var Stream = require('stream').Stream;
 var EventEmitter = require('events').EventEmitter;
 
+var Readable = Stream.Readable;
+var Writable = Stream.Writable;
+
 var kMinPoolSpace = 128;
 var kPoolSize = 40 * 1024;
 
@@ -1386,34 +1389,30 @@ fs.createReadStream = function(path, options) {
   return new ReadStream(path, options);
 };
 
-var ReadStream = fs.ReadStream = function(path, options) {
-  if (!(this instanceof ReadStream)) return new ReadStream(path, options);
-
-  Stream.call(this);
-
-  var self = this;
-
-  this.path = path;
-  this.fd = null;
-  this.readable = true;
-  this.paused = false;
+util.inherits(ReadStream, Readable);
+fs.ReadStream = ReadStream;
 
-  this.flags = 'r';
-  this.mode = 438; /*=0666*/
-  this.bufferSize = 64 * 1024;
+function ReadStream(path, options) {
+  if (!(this instanceof ReadStream))
+    return new ReadStream(path, options);
 
-  options = options || {};
+  // a little bit bigger buffer and water marks by default
+  options = util._extend({
+    bufferSize: 64 * 1024,
+    lowWaterMark: 16 * 1024,
+    highWaterMark: 64 * 1024
+  }, options || {});
 
-  // Mixin options into this
-  var keys = Object.keys(options);
-  for (var index = 0, length = keys.length; index < length; index++) {
-    var key = keys[index];
-    this[key] = options[key];
-  }
+  Readable.call(this, options);
 
-  assertEncoding(this.encoding);
+  this.path = path;
+  this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+  this.flags = options.hasOwnProperty('flags') ? options.flags : 'r';
+  this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
 
-  if (this.encoding) this.setEncoding(this.encoding);
+  this.start = options.hasOwnProperty('start') ? options.start : undefined;
+  this.end = options.hasOwnProperty('start') ? options.end : undefined;
+  this.pos = undefined;
 
   if (this.start !== undefined) {
     if ('number' !== typeof this.start) {
@@ -1432,41 +1431,40 @@ var ReadStream = fs.ReadStream = function(path, options) {
     this.pos = this.start;
   }
 
-  if (this.fd !== null) {
-    process.nextTick(function() {
-      self._read();
-    });
-    return;
-  }
+  if (typeof this.fd !== 'number')
+    this.open();
 
-  fs.open(this.path, this.flags, this.mode, function(err, fd) {
-    if (err) {
-      self.emit('error', err);
-      self.readable = false;
+  this.on('end', function() {
+    this.destroy();
+  });
+}
+
+fs.FileReadStream = fs.ReadStream; // support the legacy name
+
+ReadStream.prototype.open = function() {
+  var self = this;
+  fs.open(this.path, this.flags, this.mode, function(er, fd) {
+    if (er) {
+      self.destroy();
+      self.emit('error', er);
       return;
     }
 
     self.fd = fd;
     self.emit('open', fd);
-    self._read();
+    // start the flow of data.
+    self.read();
   });
 };
-util.inherits(ReadStream, Stream);
-
-fs.FileReadStream = fs.ReadStream; // support the legacy name
-
-ReadStream.prototype.setEncoding = function(encoding) {
-  assertEncoding(encoding);
-  var StringDecoder = require('string_decoder').StringDecoder; // lazy load
-  this._decoder = new StringDecoder(encoding);
-};
 
+ReadStream.prototype._read = function(n, cb) {
+  if (typeof this.fd !== 'number')
+    return this.once('open', function() {
+      this._read(n, cb);
+    });
 
-ReadStream.prototype._read = function() {
-  var self = this;
-  if (!this.readable || this.paused || this.reading) return;
-
-  this.reading = true;
+  if (this.destroyed)
+    return;
 
   if (!pool || pool.length - pool.used < kMinPoolSpace) {
     // discard the old pool. Can't add to the free list because
@@ -1475,149 +1473,110 @@ ReadStream.prototype._read = function() {
     allocNewPool();
   }
 
-  // Grab another reference to the pool in the case that while we're in the
-  // thread pool another read() finishes up the pool, and allocates a new
-  // one.
+  // Grab another reference to the pool in the case that while we're
+  // in the thread pool another read() finishes up the pool, and
+  // allocates a new one.
   var thisPool = pool;
-  var toRead = Math.min(pool.length - pool.used, ~~this.bufferSize);
+  var toRead = Math.min(pool.length - pool.used, n);
   var start = pool.used;
 
-  if (this.pos !== undefined) {
+  if (this.pos !== undefined)
     toRead = Math.min(this.end - this.pos + 1, toRead);
-  }
 
-  function afterRead(err, bytesRead) {
-    self.reading = false;
-    if (err) {
-      fs.close(self.fd, function() {
-        self.fd = null;
-        self.emit('error', err);
-        self.readable = false;
-      });
-      return;
-    }
+  // already read everything we were supposed to read!
+  // treat as EOF.
+  if (toRead <= 0)
+    return cb();
 
-    if (bytesRead === 0) {
-      if (this._decoder) {
-        var ret = this._decoder.end();
-        if (ret)
-          this.emit('data', ret);
-      }
-      self.emit('end');
-      self.destroy();
-      return;
-    }
-
-    var b = thisPool.slice(start, start + bytesRead);
+  // the actual read.
+  var self = this;
+  fs.read(this.fd, pool, pool.used, toRead, this.pos, onread);
 
-    // Possible optimizition here?
-    // Reclaim some bytes if bytesRead < toRead?
-    // Would need to ensure that pool === thisPool.
+  // move the pool positions, and internal position for reading.
+  if (this.pos !== undefined)
+    this.pos += toRead;
+  pool.used += toRead;
 
-    // do not emit events if the stream is paused
-    if (self.paused) {
-      self.buffer = b;
-      return;
+  function onread(er, bytesRead) {
+    if (er) {
+      self.destroy();
+      return cb(er);
     }
 
-    // do not emit events anymore after we declared the stream unreadable
-    if (!self.readable) return;
+    var b = null;
+    if (bytesRead > 0)
+      b = thisPool.slice(start, start + bytesRead);
 
-    self._emitData(b);
-    self._read();
+    cb(null, b);
   }
-
-  fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead);
-
-  if (this.pos !== undefined) {
-    this.pos += toRead;
-  }
-  pool.used += toRead;
 };
 
 
-ReadStream.prototype._emitData = function(d) {
-  if (this._decoder) {
-    var string = this._decoder.write(d);
-    if (string.length) this.emit('data', string);
-  } else {
-    this.emit('data', d);
-  }
+ReadStream.prototype.destroy = function() {
+  if (this.destroyed)
+    return;
+  this.destroyed = true;
+
+  if ('number' === typeof this.fd)
+    this.close();
 };
 
 
-ReadStream.prototype.destroy = function() {
+ReadStream.prototype.close = function(cb) {
+  if (cb)
+    this.once('close', cb);
+  if (this.closed || 'number' !== typeof this.fd) {
+    if ('number' !== typeof this.fd)
+      this.once('open', close);
+    return process.nextTick(this.emit.bind(this, 'close'));
+  }
+  this.closed = true;
   var self = this;
-
-  if (!this.readable) return;
-  this.readable = false;
+  close();
 
   function close() {
-    fs.close(self.fd, function(err) {
-      if (err) {
-        self.emit('error', err);
-      } else {
+    fs.close(self.fd, function(er) {
+      if (er)
+        self.emit('error', er);
+      else
         self.emit('close');
-      }
     });
-  }
-
-  if (this.fd === null) {
-    this.addListener('open', close);
-  } else {
-    close();
+    self.fd = null;
   }
 };
 
 
-ReadStream.prototype.pause = function() {
-  this.paused = true;
-};
-
-
-ReadStream.prototype.resume = function() {
-  this.paused = false;
-
-  if (this.buffer) {
-    var buffer = this.buffer;
-    this.buffer = null;
-    this._emitData(buffer);
-  }
-
-  // hasn't opened yet.
-  if (null == this.fd) return;
-
-  this._read();
-};
-
 
 
 fs.createWriteStream = function(path, options) {
   return new WriteStream(path, options);
 };
 
-var WriteStream = fs.WriteStream = function(path, options) {
-  if (!(this instanceof WriteStream)) return new WriteStream(path, options);
+util.inherits(WriteStream, Writable);
+fs.WriteStream = WriteStream;
+function WriteStream(path, options) {
+  if (!(this instanceof WriteStream))
+    return new WriteStream(path, options);
 
-  Stream.call(this);
+  // a little bit bigger buffer and water marks by default
+  options = util._extend({
+    bufferSize: 64 * 1024,
+    lowWaterMark: 16 * 1024,
+    highWaterMark: 64 * 1024
+  }, options || {});
+
+  Writable.call(this, options);
 
   this.path = path;
   this.fd = null;
-  this.writable = true;
-
-  this.flags = 'w';
-  this.encoding = 'binary';
-  this.mode = 438; /*=0666*/
-  this.bytesWritten = 0;
 
-  options = options || {};
+  this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+  this.flags = options.hasOwnProperty('flags') ? options.flags : 'w';
+  this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
 
-  // Mixin options into this
-  var keys = Object.keys(options);
-  for (var index = 0, length = keys.length; index < length; index++) {
-    var key = keys[index];
-    this[key] = options[key];
-  }
+  this.start = options.hasOwnProperty('start') ? options.start : undefined;
+  this.pos = undefined;
+  this.bytesWritten = 0;
 
   if (this.start !== undefined) {
     if ('number' !== typeof this.start) {
@@ -1630,154 +1589,54 @@ var WriteStream = fs.WriteStream = function(path, options) {
     this.pos = this.start;
   }
 
-  this.busy = false;
-  this._queue = [];
+  if ('number' !== typeof this.fd)
+    this.open();
 
-  if (this.fd === null) {
-    this._open = fs.open;
-    this._queue.push([this._open, this.path, this.flags, this.mode, undefined]);
-    this.flush();
-  }
-};
-util.inherits(WriteStream, Stream);
+  // dispose on finish.
+  this.once('finish', this.close);
+}
 
 fs.FileWriteStream = fs.WriteStream; // support the legacy name
 
-WriteStream.prototype.flush = function() {
-  if (this.busy) return;
-  var self = this;
-
-  var args = this._queue.shift();
-  if (!args) {
-    if (this.drainable) { this.emit('drain'); }
-    return;
-  }
-
-  this.busy = true;
-
-  var method = args.shift(),
-      cb = args.pop();
-
-  args.push(function(err) {
-    self.busy = false;
-
-    if (err) {
-      self.writable = false;
-
-      function emit() {
-        self.fd = null;
-        if (cb) cb(err);
-        self.emit('error', err);
-      }
-
-      if (self.fd === null) {
-        emit();
-      } else {
-        fs.close(self.fd, emit);
-      }
-
-      return;
-    }
 
-    if (method == fs.write) {
-      self.bytesWritten += arguments[1];
-      if (cb) {
-        // write callback
-        cb(null, arguments[1]);
-      }
-
-    } else if (method === self._open) {
-      // save reference for file pointer
-      self.fd = arguments[1];
-      self.emit('open', self.fd);
-
-    } else if (method === fs.close) {
-      // stop flushing after close
-      if (cb) {
-        cb(null);
-      }
-      self.emit('close');
+WriteStream.prototype.open = function() {
+  fs.open(this.path, this.flags, this.mode, function(er, fd) {
+    if (er) {
+      this.destroy();
+      this.emit('error', er);
       return;
     }
 
-    self.flush();
-  });
-
-  // Inject the file pointer
-  if (method !== self._open) {
-    args.unshift(this.fd);
-  }
-
-  method.apply(this, args);
+    this.fd = fd;
+    this.emit('open', fd);
+  }.bind(this));
 };
 
-WriteStream.prototype.write = function(data) {
-  if (!this.writable) {
-    this.emit('error', new Error('stream not writable'));
-    return false;
-  }
 
-  this.drainable = true;
-
-  var cb;
-  if (typeof(arguments[arguments.length - 1]) == 'function') {
-    cb = arguments[arguments.length - 1];
-  }
+WriteStream.prototype._write = function(data, cb) {
+  if (!Buffer.isBuffer(data))
+    return this.emit('error', new Error('Invalid data'));
 
-  if (!Buffer.isBuffer(data)) {
-    var encoding = 'utf8';
-    if (typeof(arguments[1]) == 'string') encoding = arguments[1];
-    assertEncoding(encoding);
-    data = new Buffer('' + data, encoding);
-  }
+  if (typeof this.fd !== 'number')
+    return this.once('open', this._write.bind(this, data, cb));
 
-  this._queue.push([fs.write, data, 0, data.length, this.pos, cb]);
+  var self = this;
+  fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
+    if (er) {
+      self.destroy();
+      return cb(er);
+    }
+    self.bytesWritten += bytes;
+    cb();
+  });
 
-  if (this.pos !== undefined) {
+  if (this.pos !== undefined)
     this.pos += data.length;
-  }
-
-  this.flush();
-
-  return false;
-};
-
-WriteStream.prototype.end = function(data, encoding, cb) {
-  if (typeof(data) === 'function') {
-    cb = data;
-  } else if (typeof(encoding) === 'function') {
-    cb = encoding;
-    this.write(data);
-  } else if (arguments.length > 0) {
-    this.write(data, encoding);
-  }
-  this.writable = false;
-  this._queue.push([fs.close, cb]);
-  this.flush();
 };
 
-WriteStream.prototype.destroy = function() {
-  var self = this;
-
-  if (!this.writable) return;
-  this.writable = false;
-
-  function close() {
-    fs.close(self.fd, function(err) {
-      if (err) {
-        self.emit('error', err);
-      } else {
-        self.emit('close');
-      }
-    });
-  }
 
-  if (this.fd === null) {
-    this.addListener('open', close);
-  } else {
-    close();
-  }
-};
+WriteStream.prototype.destroy = ReadStream.prototype.destroy;
+WriteStream.prototype.close = ReadStream.prototype.close;
 
 // There is no shutdown() for files.
 WriteStream.prototype.destroySoon = WriteStream.prototype.end;
index 5d2286c..8829544 100644 (file)
 var common = require('../common');
 var assert = require('assert');
 
-var path = require('path'),
-    fs = require('fs'),
-    fn = path.join(common.tmpDir, 'write.txt'),
-    file = fs.createWriteStream(fn),
+var path = require('path');
+var fs = require('fs');
+var fn = path.join(common.tmpDir, 'write.txt');
+var file = fs.createWriteStream(fn, {
+      lowWaterMark: 3,
+      highWaterMark: 10
+    });
 
-    EXPECTED = '012345678910',
+var EXPECTED = '012345678910';
 
-    callbacks = {
+var callbacks = {
       open: -1,
       drain: -2,
-      close: -1,
-      endCb: -1
+      close: -1
     };
 
 file
   .on('open', function(fd) {
+      console.error('open!');
       callbacks.open++;
       assert.equal('number', typeof fd);
     })
   .on('error', function(err) {
       throw err;
+      console.error('error!', err.stack);
     })
   .on('drain', function() {
+      console.error('drain!', callbacks.drain);
       callbacks.drain++;
       if (callbacks.drain == -1) {
-        assert.equal(EXPECTED, fs.readFileSync(fn));
+        assert.equal(EXPECTED, fs.readFileSync(fn, 'utf8'));
         file.write(EXPECTED);
       } else if (callbacks.drain == 0) {
-        assert.equal(EXPECTED + EXPECTED, fs.readFileSync(fn));
-        file.end(function(err) {
-          assert.ok(!err);
-          callbacks.endCb++;
-        });
+        assert.equal(EXPECTED + EXPECTED, fs.readFileSync(fn, 'utf8'));
+        file.end();
       }
     })
   .on('close', function() {
+      console.error('close!');
       assert.strictEqual(file.bytesWritten, EXPECTED.length * 2);
 
       callbacks.close++;
       assert.throws(function() {
+        console.error('write after end should not be allowed');
         file.write('should not work anymore');
       });
 
@@ -70,7 +74,7 @@ file
 
 for (var i = 0; i < 11; i++) {
   (function(i) {
-    assert.strictEqual(false, file.write(i));
+    file.write('' + i);
   })(i);
 }
 
@@ -78,4 +82,5 @@ process.on('exit', function() {
   for (var k in callbacks) {
     assert.equal(0, callbacks[k], k + ' count off by ' + callbacks[k]);
   }
+  console.log('ok');
 });
index 9b5d7ff..4f2e73c 100644 (file)
 var common = require('../common');
 var assert = require('assert');
 
-var path = require('path'),
-    fs = require('fs'),
-    util = require('util');
+var path = require('path');
+var fs = require('fs');
+var util = require('util');
 
 
-var filepath = path.join(common.tmpDir, 'write.txt'),
-    file;
+var filepath = path.join(common.tmpDir, 'write.txt');
+var file;
 
 var EXPECTED = '012345678910';
 
-var cb_expected = 'write open drain write drain close error ',
-    cb_occurred = '';
+var cb_expected = 'write open drain write drain close error ';
+var cb_occurred = '';
 
 var countDrains = 0;
 
@@ -47,6 +47,8 @@ process.on('exit', function() {
     assert.strictEqual(cb_occurred, cb_expected,
         'events missing or out of order: "' +
         cb_occurred + '" !== "' + cb_expected + '"');
+  } else {
+    console.log('ok');
   }
 });
 
@@ -59,22 +61,30 @@ function removeTestFile() {
 
 removeTestFile();
 
-file = fs.createWriteStream(filepath);
+// drain at 0, return false at 10.
+file = fs.createWriteStream(filepath, {
+  lowWaterMark: 0,
+  highWaterMark: 11
+});
 
 file.on('open', function(fd) {
+  console.error('open');
   cb_occurred += 'open ';
   assert.equal(typeof fd, 'number');
 });
 
 file.on('drain', function() {
+  console.error('drain');
   cb_occurred += 'drain ';
   ++countDrains;
   if (countDrains === 1) {
-    assert.equal(fs.readFileSync(filepath), EXPECTED);
-    file.write(EXPECTED);
+    console.error('drain=1, write again');
+    assert.equal(fs.readFileSync(filepath, 'utf8'), EXPECTED);
+    console.error('ondrain write ret=%j', file.write(EXPECTED));
     cb_occurred += 'write ';
   } else if (countDrains == 2) {
-    assert.equal(fs.readFileSync(filepath), EXPECTED + EXPECTED);
+    console.error('second drain, end');
+    assert.equal(fs.readFileSync(filepath, 'utf8'), EXPECTED + EXPECTED);
     file.end();
   }
 });
@@ -88,11 +98,15 @@ file.on('close', function() {
 
 file.on('error', function(err) {
   cb_occurred += 'error ';
-  assert.ok(err.message.indexOf('not writable') >= 0);
+  assert.ok(err.message.indexOf('write after end') >= 0);
 });
 
 
 for (var i = 0; i < 11; i++) {
-  assert.strictEqual(file.write(i), false);
+  var ret = file.write(i + '');
+  console.error('%d %j', i, ret);
+
+  // return false when i hits 10
+  assert(ret === (i != 10));
 }
 cb_occurred += 'write ';
index 71cff2c..a88802b 100644 (file)
@@ -60,12 +60,10 @@ file.on('data', function(data) {
 
   paused = true;
   file.pause();
-  assert.ok(file.paused);
 
   setTimeout(function() {
     paused = false;
     file.resume();
-    assert.ok(!file.paused);
   }, 10);
 });
 
@@ -77,7 +75,6 @@ file.on('end', function(chunk) {
 
 file.on('close', function() {
   callbacks.close++;
-  assert.ok(!file.readable);
 
   //assert.equal(fs.readFileSync(fn), fileContent);
 });
@@ -104,6 +101,7 @@ process.on('exit', function() {
   assert.equal(2, callbacks.close);
   assert.equal(30000, file.length);
   assert.equal(10000, file3.length);
+  console.error('ok');
 });
 
 var file4 = fs.createReadStream(rangeFile, {bufferSize: 1, start: 1, end: 2});