From 44b308b1f7beaa3398d869f1626f01ae6526bc0b Mon Sep 17 00:00:00 2001 From: isaacs Date: Thu, 4 Oct 2012 17:44:48 -0700 Subject: [PATCH] fs: streams2 --- lib/fs.js | 423 +++++++++++---------------------- test/simple/test-file-write-stream.js | 35 +-- test/simple/test-file-write-stream2.js | 40 +++- test/simple/test-fs-read-stream.js | 4 +- 4 files changed, 189 insertions(+), 313 deletions(-) diff --git a/lib/fs.js b/lib/fs.js index 83bacc9..96af631 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -34,6 +34,9 @@ var fs = exports; var Stream = require('stream').Stream; var EventEmitter = require('events').EventEmitter; +var Readable = Stream.Readable; +var Writable = Stream.Writable; + var kMinPoolSpace = 128; var kPoolSize = 40 * 1024; @@ -1386,34 +1389,30 @@ fs.createReadStream = function(path, options) { return new ReadStream(path, options); }; -var ReadStream = fs.ReadStream = function(path, options) { - if (!(this instanceof ReadStream)) return new ReadStream(path, options); - - Stream.call(this); - - var self = this; - - this.path = path; - this.fd = null; - this.readable = true; - this.paused = false; +util.inherits(ReadStream, Readable); +fs.ReadStream = ReadStream; - this.flags = 'r'; - this.mode = 438; /*=0666*/ - this.bufferSize = 64 * 1024; +function ReadStream(path, options) { + if (!(this instanceof ReadStream)) + return new ReadStream(path, options); - options = options || {}; + // a little bit bigger buffer and water marks by default + options = util._extend({ + bufferSize: 64 * 1024, + lowWaterMark: 16 * 1024, + highWaterMark: 64 * 1024 + }, options || {}); - // Mixin options into this - var keys = Object.keys(options); - for (var index = 0, length = keys.length; index < length; index++) { - var key = keys[index]; - this[key] = options[key]; - } + Readable.call(this, options); - assertEncoding(this.encoding); + this.path = path; + this.fd = options.hasOwnProperty('fd') ? options.fd : null; + this.flags = options.hasOwnProperty('flags') ? options.flags : 'r'; + this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/ - if (this.encoding) this.setEncoding(this.encoding); + this.start = options.hasOwnProperty('start') ? options.start : undefined; + this.end = options.hasOwnProperty('start') ? options.end : undefined; + this.pos = undefined; if (this.start !== undefined) { if ('number' !== typeof this.start) { @@ -1432,41 +1431,40 @@ var ReadStream = fs.ReadStream = function(path, options) { this.pos = this.start; } - if (this.fd !== null) { - process.nextTick(function() { - self._read(); - }); - return; - } + if (typeof this.fd !== 'number') + this.open(); - fs.open(this.path, this.flags, this.mode, function(err, fd) { - if (err) { - self.emit('error', err); - self.readable = false; + this.on('end', function() { + this.destroy(); + }); +} + +fs.FileReadStream = fs.ReadStream; // support the legacy name + +ReadStream.prototype.open = function() { + var self = this; + fs.open(this.path, this.flags, this.mode, function(er, fd) { + if (er) { + self.destroy(); + self.emit('error', er); return; } self.fd = fd; self.emit('open', fd); - self._read(); + // start the flow of data. + self.read(); }); }; -util.inherits(ReadStream, Stream); - -fs.FileReadStream = fs.ReadStream; // support the legacy name - -ReadStream.prototype.setEncoding = function(encoding) { - assertEncoding(encoding); - var StringDecoder = require('string_decoder').StringDecoder; // lazy load - this._decoder = new StringDecoder(encoding); -}; +ReadStream.prototype._read = function(n, cb) { + if (typeof this.fd !== 'number') + return this.once('open', function() { + this._read(n, cb); + }); -ReadStream.prototype._read = function() { - var self = this; - if (!this.readable || this.paused || this.reading) return; - - this.reading = true; + if (this.destroyed) + return; if (!pool || pool.length - pool.used < kMinPoolSpace) { // discard the old pool. Can't add to the free list because @@ -1475,149 +1473,110 @@ ReadStream.prototype._read = function() { allocNewPool(); } - // Grab another reference to the pool in the case that while we're in the - // thread pool another read() finishes up the pool, and allocates a new - // one. + // Grab another reference to the pool in the case that while we're + // in the thread pool another read() finishes up the pool, and + // allocates a new one. var thisPool = pool; - var toRead = Math.min(pool.length - pool.used, ~~this.bufferSize); + var toRead = Math.min(pool.length - pool.used, n); var start = pool.used; - if (this.pos !== undefined) { + if (this.pos !== undefined) toRead = Math.min(this.end - this.pos + 1, toRead); - } - function afterRead(err, bytesRead) { - self.reading = false; - if (err) { - fs.close(self.fd, function() { - self.fd = null; - self.emit('error', err); - self.readable = false; - }); - return; - } + // already read everything we were supposed to read! + // treat as EOF. + if (toRead <= 0) + return cb(); - if (bytesRead === 0) { - if (this._decoder) { - var ret = this._decoder.end(); - if (ret) - this.emit('data', ret); - } - self.emit('end'); - self.destroy(); - return; - } - - var b = thisPool.slice(start, start + bytesRead); + // the actual read. + var self = this; + fs.read(this.fd, pool, pool.used, toRead, this.pos, onread); - // Possible optimizition here? - // Reclaim some bytes if bytesRead < toRead? - // Would need to ensure that pool === thisPool. + // move the pool positions, and internal position for reading. + if (this.pos !== undefined) + this.pos += toRead; + pool.used += toRead; - // do not emit events if the stream is paused - if (self.paused) { - self.buffer = b; - return; + function onread(er, bytesRead) { + if (er) { + self.destroy(); + return cb(er); } - // do not emit events anymore after we declared the stream unreadable - if (!self.readable) return; + var b = null; + if (bytesRead > 0) + b = thisPool.slice(start, start + bytesRead); - self._emitData(b); - self._read(); + cb(null, b); } - - fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead); - - if (this.pos !== undefined) { - this.pos += toRead; - } - pool.used += toRead; }; -ReadStream.prototype._emitData = function(d) { - if (this._decoder) { - var string = this._decoder.write(d); - if (string.length) this.emit('data', string); - } else { - this.emit('data', d); - } +ReadStream.prototype.destroy = function() { + if (this.destroyed) + return; + this.destroyed = true; + + if ('number' === typeof this.fd) + this.close(); }; -ReadStream.prototype.destroy = function() { +ReadStream.prototype.close = function(cb) { + if (cb) + this.once('close', cb); + if (this.closed || 'number' !== typeof this.fd) { + if ('number' !== typeof this.fd) + this.once('open', close); + return process.nextTick(this.emit.bind(this, 'close')); + } + this.closed = true; var self = this; - - if (!this.readable) return; - this.readable = false; + close(); function close() { - fs.close(self.fd, function(err) { - if (err) { - self.emit('error', err); - } else { + fs.close(self.fd, function(er) { + if (er) + self.emit('error', er); + else self.emit('close'); - } }); - } - - if (this.fd === null) { - this.addListener('open', close); - } else { - close(); + self.fd = null; } }; -ReadStream.prototype.pause = function() { - this.paused = true; -}; - - -ReadStream.prototype.resume = function() { - this.paused = false; - - if (this.buffer) { - var buffer = this.buffer; - this.buffer = null; - this._emitData(buffer); - } - - // hasn't opened yet. - if (null == this.fd) return; - - this._read(); -}; - fs.createWriteStream = function(path, options) { return new WriteStream(path, options); }; -var WriteStream = fs.WriteStream = function(path, options) { - if (!(this instanceof WriteStream)) return new WriteStream(path, options); +util.inherits(WriteStream, Writable); +fs.WriteStream = WriteStream; +function WriteStream(path, options) { + if (!(this instanceof WriteStream)) + return new WriteStream(path, options); - Stream.call(this); + // a little bit bigger buffer and water marks by default + options = util._extend({ + bufferSize: 64 * 1024, + lowWaterMark: 16 * 1024, + highWaterMark: 64 * 1024 + }, options || {}); + + Writable.call(this, options); this.path = path; this.fd = null; - this.writable = true; - - this.flags = 'w'; - this.encoding = 'binary'; - this.mode = 438; /*=0666*/ - this.bytesWritten = 0; - options = options || {}; + this.fd = options.hasOwnProperty('fd') ? options.fd : null; + this.flags = options.hasOwnProperty('flags') ? options.flags : 'w'; + this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/ - // Mixin options into this - var keys = Object.keys(options); - for (var index = 0, length = keys.length; index < length; index++) { - var key = keys[index]; - this[key] = options[key]; - } + this.start = options.hasOwnProperty('start') ? options.start : undefined; + this.pos = undefined; + this.bytesWritten = 0; if (this.start !== undefined) { if ('number' !== typeof this.start) { @@ -1630,154 +1589,54 @@ var WriteStream = fs.WriteStream = function(path, options) { this.pos = this.start; } - this.busy = false; - this._queue = []; + if ('number' !== typeof this.fd) + this.open(); - if (this.fd === null) { - this._open = fs.open; - this._queue.push([this._open, this.path, this.flags, this.mode, undefined]); - this.flush(); - } -}; -util.inherits(WriteStream, Stream); + // dispose on finish. + this.once('finish', this.close); +} fs.FileWriteStream = fs.WriteStream; // support the legacy name -WriteStream.prototype.flush = function() { - if (this.busy) return; - var self = this; - - var args = this._queue.shift(); - if (!args) { - if (this.drainable) { this.emit('drain'); } - return; - } - - this.busy = true; - - var method = args.shift(), - cb = args.pop(); - - args.push(function(err) { - self.busy = false; - - if (err) { - self.writable = false; - - function emit() { - self.fd = null; - if (cb) cb(err); - self.emit('error', err); - } - - if (self.fd === null) { - emit(); - } else { - fs.close(self.fd, emit); - } - - return; - } - if (method == fs.write) { - self.bytesWritten += arguments[1]; - if (cb) { - // write callback - cb(null, arguments[1]); - } - - } else if (method === self._open) { - // save reference for file pointer - self.fd = arguments[1]; - self.emit('open', self.fd); - - } else if (method === fs.close) { - // stop flushing after close - if (cb) { - cb(null); - } - self.emit('close'); +WriteStream.prototype.open = function() { + fs.open(this.path, this.flags, this.mode, function(er, fd) { + if (er) { + this.destroy(); + this.emit('error', er); return; } - self.flush(); - }); - - // Inject the file pointer - if (method !== self._open) { - args.unshift(this.fd); - } - - method.apply(this, args); + this.fd = fd; + this.emit('open', fd); + }.bind(this)); }; -WriteStream.prototype.write = function(data) { - if (!this.writable) { - this.emit('error', new Error('stream not writable')); - return false; - } - this.drainable = true; - - var cb; - if (typeof(arguments[arguments.length - 1]) == 'function') { - cb = arguments[arguments.length - 1]; - } +WriteStream.prototype._write = function(data, cb) { + if (!Buffer.isBuffer(data)) + return this.emit('error', new Error('Invalid data')); - if (!Buffer.isBuffer(data)) { - var encoding = 'utf8'; - if (typeof(arguments[1]) == 'string') encoding = arguments[1]; - assertEncoding(encoding); - data = new Buffer('' + data, encoding); - } + if (typeof this.fd !== 'number') + return this.once('open', this._write.bind(this, data, cb)); - this._queue.push([fs.write, data, 0, data.length, this.pos, cb]); + var self = this; + fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) { + if (er) { + self.destroy(); + return cb(er); + } + self.bytesWritten += bytes; + cb(); + }); - if (this.pos !== undefined) { + if (this.pos !== undefined) this.pos += data.length; - } - - this.flush(); - - return false; -}; - -WriteStream.prototype.end = function(data, encoding, cb) { - if (typeof(data) === 'function') { - cb = data; - } else if (typeof(encoding) === 'function') { - cb = encoding; - this.write(data); - } else if (arguments.length > 0) { - this.write(data, encoding); - } - this.writable = false; - this._queue.push([fs.close, cb]); - this.flush(); }; -WriteStream.prototype.destroy = function() { - var self = this; - - if (!this.writable) return; - this.writable = false; - - function close() { - fs.close(self.fd, function(err) { - if (err) { - self.emit('error', err); - } else { - self.emit('close'); - } - }); - } - if (this.fd === null) { - this.addListener('open', close); - } else { - close(); - } -}; +WriteStream.prototype.destroy = ReadStream.prototype.destroy; +WriteStream.prototype.close = ReadStream.prototype.close; // There is no shutdown() for files. WriteStream.prototype.destroySoon = WriteStream.prototype.end; diff --git a/test/simple/test-file-write-stream.js b/test/simple/test-file-write-stream.js index 5d2286c..8829544 100644 --- a/test/simple/test-file-write-stream.js +++ b/test/simple/test-file-write-stream.js @@ -22,46 +22,50 @@ var common = require('../common'); var assert = require('assert'); -var path = require('path'), - fs = require('fs'), - fn = path.join(common.tmpDir, 'write.txt'), - file = fs.createWriteStream(fn), +var path = require('path'); +var fs = require('fs'); +var fn = path.join(common.tmpDir, 'write.txt'); +var file = fs.createWriteStream(fn, { + lowWaterMark: 3, + highWaterMark: 10 + }); - EXPECTED = '012345678910', +var EXPECTED = '012345678910'; - callbacks = { +var callbacks = { open: -1, drain: -2, - close: -1, - endCb: -1 + close: -1 }; file .on('open', function(fd) { + console.error('open!'); callbacks.open++; assert.equal('number', typeof fd); }) .on('error', function(err) { throw err; + console.error('error!', err.stack); }) .on('drain', function() { + console.error('drain!', callbacks.drain); callbacks.drain++; if (callbacks.drain == -1) { - assert.equal(EXPECTED, fs.readFileSync(fn)); + assert.equal(EXPECTED, fs.readFileSync(fn, 'utf8')); file.write(EXPECTED); } else if (callbacks.drain == 0) { - assert.equal(EXPECTED + EXPECTED, fs.readFileSync(fn)); - file.end(function(err) { - assert.ok(!err); - callbacks.endCb++; - }); + assert.equal(EXPECTED + EXPECTED, fs.readFileSync(fn, 'utf8')); + file.end(); } }) .on('close', function() { + console.error('close!'); assert.strictEqual(file.bytesWritten, EXPECTED.length * 2); callbacks.close++; assert.throws(function() { + console.error('write after end should not be allowed'); file.write('should not work anymore'); }); @@ -70,7 +74,7 @@ file for (var i = 0; i < 11; i++) { (function(i) { - assert.strictEqual(false, file.write(i)); + file.write('' + i); })(i); } @@ -78,4 +82,5 @@ process.on('exit', function() { for (var k in callbacks) { assert.equal(0, callbacks[k], k + ' count off by ' + callbacks[k]); } + console.log('ok'); }); diff --git a/test/simple/test-file-write-stream2.js b/test/simple/test-file-write-stream2.js index 9b5d7ff..4f2e73c 100644 --- a/test/simple/test-file-write-stream2.js +++ b/test/simple/test-file-write-stream2.js @@ -22,18 +22,18 @@ var common = require('../common'); var assert = require('assert'); -var path = require('path'), - fs = require('fs'), - util = require('util'); +var path = require('path'); +var fs = require('fs'); +var util = require('util'); -var filepath = path.join(common.tmpDir, 'write.txt'), - file; +var filepath = path.join(common.tmpDir, 'write.txt'); +var file; var EXPECTED = '012345678910'; -var cb_expected = 'write open drain write drain close error ', - cb_occurred = ''; +var cb_expected = 'write open drain write drain close error '; +var cb_occurred = ''; var countDrains = 0; @@ -47,6 +47,8 @@ process.on('exit', function() { assert.strictEqual(cb_occurred, cb_expected, 'events missing or out of order: "' + cb_occurred + '" !== "' + cb_expected + '"'); + } else { + console.log('ok'); } }); @@ -59,22 +61,30 @@ function removeTestFile() { removeTestFile(); -file = fs.createWriteStream(filepath); +// drain at 0, return false at 10. +file = fs.createWriteStream(filepath, { + lowWaterMark: 0, + highWaterMark: 11 +}); file.on('open', function(fd) { + console.error('open'); cb_occurred += 'open '; assert.equal(typeof fd, 'number'); }); file.on('drain', function() { + console.error('drain'); cb_occurred += 'drain '; ++countDrains; if (countDrains === 1) { - assert.equal(fs.readFileSync(filepath), EXPECTED); - file.write(EXPECTED); + console.error('drain=1, write again'); + assert.equal(fs.readFileSync(filepath, 'utf8'), EXPECTED); + console.error('ondrain write ret=%j', file.write(EXPECTED)); cb_occurred += 'write '; } else if (countDrains == 2) { - assert.equal(fs.readFileSync(filepath), EXPECTED + EXPECTED); + console.error('second drain, end'); + assert.equal(fs.readFileSync(filepath, 'utf8'), EXPECTED + EXPECTED); file.end(); } }); @@ -88,11 +98,15 @@ file.on('close', function() { file.on('error', function(err) { cb_occurred += 'error '; - assert.ok(err.message.indexOf('not writable') >= 0); + assert.ok(err.message.indexOf('write after end') >= 0); }); for (var i = 0; i < 11; i++) { - assert.strictEqual(file.write(i), false); + var ret = file.write(i + ''); + console.error('%d %j', i, ret); + + // return false when i hits 10 + assert(ret === (i != 10)); } cb_occurred += 'write '; diff --git a/test/simple/test-fs-read-stream.js b/test/simple/test-fs-read-stream.js index 71cff2c..a88802b 100644 --- a/test/simple/test-fs-read-stream.js +++ b/test/simple/test-fs-read-stream.js @@ -60,12 +60,10 @@ file.on('data', function(data) { paused = true; file.pause(); - assert.ok(file.paused); setTimeout(function() { paused = false; file.resume(); - assert.ok(!file.paused); }, 10); }); @@ -77,7 +75,6 @@ file.on('end', function(chunk) { file.on('close', function() { callbacks.close++; - assert.ok(!file.readable); //assert.equal(fs.readFileSync(fn), fileContent); }); @@ -104,6 +101,7 @@ process.on('exit', function() { assert.equal(2, callbacks.close); assert.equal(30000, file.length); assert.equal(10000, file3.length); + console.error('ok'); }); var file4 = fs.createReadStream(rangeFile, {bufferSize: 1, start: 1, end: 2}); -- 2.7.4