var Stream = require('stream').Stream;
var EventEmitter = require('events').EventEmitter;
+var Readable = Stream.Readable;
+var Writable = Stream.Writable;
+
var kMinPoolSpace = 128;
var kPoolSize = 40 * 1024;
return new ReadStream(path, options);
};
-var ReadStream = fs.ReadStream = function(path, options) {
- if (!(this instanceof ReadStream)) return new ReadStream(path, options);
-
- Stream.call(this);
-
- var self = this;
-
- this.path = path;
- this.fd = null;
- this.readable = true;
- this.paused = false;
+util.inherits(ReadStream, Readable);
+fs.ReadStream = ReadStream;
- this.flags = 'r';
- this.mode = 438; /*=0666*/
- this.bufferSize = 64 * 1024;
+function ReadStream(path, options) {
+ if (!(this instanceof ReadStream))
+ return new ReadStream(path, options);
- options = options || {};
+ // a little bit bigger buffer and water marks by default
+ options = util._extend({
+ bufferSize: 64 * 1024,
+ lowWaterMark: 16 * 1024,
+ highWaterMark: 64 * 1024
+ }, options || {});
- // Mixin options into this
- var keys = Object.keys(options);
- for (var index = 0, length = keys.length; index < length; index++) {
- var key = keys[index];
- this[key] = options[key];
- }
+ Readable.call(this, options);
- assertEncoding(this.encoding);
+ this.path = path;
+ this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+ this.flags = options.hasOwnProperty('flags') ? options.flags : 'r';
+ this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
- if (this.encoding) this.setEncoding(this.encoding);
+ this.start = options.hasOwnProperty('start') ? options.start : undefined;
+ this.end = options.hasOwnProperty('start') ? options.end : undefined;
+ this.pos = undefined;
if (this.start !== undefined) {
if ('number' !== typeof this.start) {
this.pos = this.start;
}
- if (this.fd !== null) {
- process.nextTick(function() {
- self._read();
- });
- return;
- }
+ if (typeof this.fd !== 'number')
+ this.open();
- fs.open(this.path, this.flags, this.mode, function(err, fd) {
- if (err) {
- self.emit('error', err);
- self.readable = false;
+ this.on('end', function() {
+ this.destroy();
+ });
+}
+
+fs.FileReadStream = fs.ReadStream; // support the legacy name
+
+ReadStream.prototype.open = function() {
+ var self = this;
+ fs.open(this.path, this.flags, this.mode, function(er, fd) {
+ if (er) {
+ self.destroy();
+ self.emit('error', er);
return;
}
self.fd = fd;
self.emit('open', fd);
- self._read();
+ // start the flow of data.
+ self.read();
});
};
-util.inherits(ReadStream, Stream);
-
-fs.FileReadStream = fs.ReadStream; // support the legacy name
-
-ReadStream.prototype.setEncoding = function(encoding) {
- assertEncoding(encoding);
- var StringDecoder = require('string_decoder').StringDecoder; // lazy load
- this._decoder = new StringDecoder(encoding);
-};
+ReadStream.prototype._read = function(n, cb) {
+ if (typeof this.fd !== 'number')
+ return this.once('open', function() {
+ this._read(n, cb);
+ });
-ReadStream.prototype._read = function() {
- var self = this;
- if (!this.readable || this.paused || this.reading) return;
-
- this.reading = true;
+ if (this.destroyed)
+ return;
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// discard the old pool. Can't add to the free list because
allocNewPool();
}
- // Grab another reference to the pool in the case that while we're in the
- // thread pool another read() finishes up the pool, and allocates a new
- // one.
+ // Grab another reference to the pool in the case that while we're
+ // in the thread pool another read() finishes up the pool, and
+ // allocates a new one.
var thisPool = pool;
- var toRead = Math.min(pool.length - pool.used, ~~this.bufferSize);
+ var toRead = Math.min(pool.length - pool.used, n);
var start = pool.used;
- if (this.pos !== undefined) {
+ if (this.pos !== undefined)
toRead = Math.min(this.end - this.pos + 1, toRead);
- }
- function afterRead(err, bytesRead) {
- self.reading = false;
- if (err) {
- fs.close(self.fd, function() {
- self.fd = null;
- self.emit('error', err);
- self.readable = false;
- });
- return;
- }
+ // already read everything we were supposed to read!
+ // treat as EOF.
+ if (toRead <= 0)
+ return cb();
- if (bytesRead === 0) {
- if (this._decoder) {
- var ret = this._decoder.end();
- if (ret)
- this.emit('data', ret);
- }
- self.emit('end');
- self.destroy();
- return;
- }
-
- var b = thisPool.slice(start, start + bytesRead);
+ // the actual read.
+ var self = this;
+ fs.read(this.fd, pool, pool.used, toRead, this.pos, onread);
- // Possible optimizition here?
- // Reclaim some bytes if bytesRead < toRead?
- // Would need to ensure that pool === thisPool.
+ // move the pool positions, and internal position for reading.
+ if (this.pos !== undefined)
+ this.pos += toRead;
+ pool.used += toRead;
- // do not emit events if the stream is paused
- if (self.paused) {
- self.buffer = b;
- return;
+ function onread(er, bytesRead) {
+ if (er) {
+ self.destroy();
+ return cb(er);
}
- // do not emit events anymore after we declared the stream unreadable
- if (!self.readable) return;
+ var b = null;
+ if (bytesRead > 0)
+ b = thisPool.slice(start, start + bytesRead);
- self._emitData(b);
- self._read();
+ cb(null, b);
}
-
- fs.read(this.fd, pool, pool.used, toRead, this.pos, afterRead);
-
- if (this.pos !== undefined) {
- this.pos += toRead;
- }
- pool.used += toRead;
};
-ReadStream.prototype._emitData = function(d) {
- if (this._decoder) {
- var string = this._decoder.write(d);
- if (string.length) this.emit('data', string);
- } else {
- this.emit('data', d);
- }
+ReadStream.prototype.destroy = function() {
+ if (this.destroyed)
+ return;
+ this.destroyed = true;
+
+ if ('number' === typeof this.fd)
+ this.close();
};
-ReadStream.prototype.destroy = function() {
+ReadStream.prototype.close = function(cb) {
+ if (cb)
+ this.once('close', cb);
+ if (this.closed || 'number' !== typeof this.fd) {
+ if ('number' !== typeof this.fd)
+ this.once('open', close);
+ return process.nextTick(this.emit.bind(this, 'close'));
+ }
+ this.closed = true;
var self = this;
-
- if (!this.readable) return;
- this.readable = false;
+ close();
function close() {
- fs.close(self.fd, function(err) {
- if (err) {
- self.emit('error', err);
- } else {
+ fs.close(self.fd, function(er) {
+ if (er)
+ self.emit('error', er);
+ else
self.emit('close');
- }
});
- }
-
- if (this.fd === null) {
- this.addListener('open', close);
- } else {
- close();
+ self.fd = null;
}
};
-ReadStream.prototype.pause = function() {
- this.paused = true;
-};
-
-
-ReadStream.prototype.resume = function() {
- this.paused = false;
-
- if (this.buffer) {
- var buffer = this.buffer;
- this.buffer = null;
- this._emitData(buffer);
- }
-
- // hasn't opened yet.
- if (null == this.fd) return;
-
- this._read();
-};
-
fs.createWriteStream = function(path, options) {
return new WriteStream(path, options);
};
-var WriteStream = fs.WriteStream = function(path, options) {
- if (!(this instanceof WriteStream)) return new WriteStream(path, options);
+util.inherits(WriteStream, Writable);
+fs.WriteStream = WriteStream;
+function WriteStream(path, options) {
+ if (!(this instanceof WriteStream))
+ return new WriteStream(path, options);
- Stream.call(this);
+ // a little bit bigger buffer and water marks by default
+ options = util._extend({
+ bufferSize: 64 * 1024,
+ lowWaterMark: 16 * 1024,
+ highWaterMark: 64 * 1024
+ }, options || {});
+
+ Writable.call(this, options);
this.path = path;
this.fd = null;
- this.writable = true;
-
- this.flags = 'w';
- this.encoding = 'binary';
- this.mode = 438; /*=0666*/
- this.bytesWritten = 0;
- options = options || {};
+ this.fd = options.hasOwnProperty('fd') ? options.fd : null;
+ this.flags = options.hasOwnProperty('flags') ? options.flags : 'w';
+ this.mode = options.hasOwnProperty('mode') ? options.mode : 438; /*=0666*/
- // Mixin options into this
- var keys = Object.keys(options);
- for (var index = 0, length = keys.length; index < length; index++) {
- var key = keys[index];
- this[key] = options[key];
- }
+ this.start = options.hasOwnProperty('start') ? options.start : undefined;
+ this.pos = undefined;
+ this.bytesWritten = 0;
if (this.start !== undefined) {
if ('number' !== typeof this.start) {
this.pos = this.start;
}
- this.busy = false;
- this._queue = [];
+ if ('number' !== typeof this.fd)
+ this.open();
- if (this.fd === null) {
- this._open = fs.open;
- this._queue.push([this._open, this.path, this.flags, this.mode, undefined]);
- this.flush();
- }
-};
-util.inherits(WriteStream, Stream);
+ // dispose on finish.
+ this.once('finish', this.close);
+}
fs.FileWriteStream = fs.WriteStream; // support the legacy name
-WriteStream.prototype.flush = function() {
- if (this.busy) return;
- var self = this;
-
- var args = this._queue.shift();
- if (!args) {
- if (this.drainable) { this.emit('drain'); }
- return;
- }
-
- this.busy = true;
-
- var method = args.shift(),
- cb = args.pop();
-
- args.push(function(err) {
- self.busy = false;
-
- if (err) {
- self.writable = false;
-
- function emit() {
- self.fd = null;
- if (cb) cb(err);
- self.emit('error', err);
- }
-
- if (self.fd === null) {
- emit();
- } else {
- fs.close(self.fd, emit);
- }
-
- return;
- }
- if (method == fs.write) {
- self.bytesWritten += arguments[1];
- if (cb) {
- // write callback
- cb(null, arguments[1]);
- }
-
- } else if (method === self._open) {
- // save reference for file pointer
- self.fd = arguments[1];
- self.emit('open', self.fd);
-
- } else if (method === fs.close) {
- // stop flushing after close
- if (cb) {
- cb(null);
- }
- self.emit('close');
+WriteStream.prototype.open = function() {
+ fs.open(this.path, this.flags, this.mode, function(er, fd) {
+ if (er) {
+ this.destroy();
+ this.emit('error', er);
return;
}
- self.flush();
- });
-
- // Inject the file pointer
- if (method !== self._open) {
- args.unshift(this.fd);
- }
-
- method.apply(this, args);
+ this.fd = fd;
+ this.emit('open', fd);
+ }.bind(this));
};
-WriteStream.prototype.write = function(data) {
- if (!this.writable) {
- this.emit('error', new Error('stream not writable'));
- return false;
- }
- this.drainable = true;
-
- var cb;
- if (typeof(arguments[arguments.length - 1]) == 'function') {
- cb = arguments[arguments.length - 1];
- }
+WriteStream.prototype._write = function(data, cb) {
+ if (!Buffer.isBuffer(data))
+ return this.emit('error', new Error('Invalid data'));
- if (!Buffer.isBuffer(data)) {
- var encoding = 'utf8';
- if (typeof(arguments[1]) == 'string') encoding = arguments[1];
- assertEncoding(encoding);
- data = new Buffer('' + data, encoding);
- }
+ if (typeof this.fd !== 'number')
+ return this.once('open', this._write.bind(this, data, cb));
- this._queue.push([fs.write, data, 0, data.length, this.pos, cb]);
+ var self = this;
+ fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) {
+ if (er) {
+ self.destroy();
+ return cb(er);
+ }
+ self.bytesWritten += bytes;
+ cb();
+ });
- if (this.pos !== undefined) {
+ if (this.pos !== undefined)
this.pos += data.length;
- }
-
- this.flush();
-
- return false;
-};
-
-WriteStream.prototype.end = function(data, encoding, cb) {
- if (typeof(data) === 'function') {
- cb = data;
- } else if (typeof(encoding) === 'function') {
- cb = encoding;
- this.write(data);
- } else if (arguments.length > 0) {
- this.write(data, encoding);
- }
- this.writable = false;
- this._queue.push([fs.close, cb]);
- this.flush();
};
-WriteStream.prototype.destroy = function() {
- var self = this;
-
- if (!this.writable) return;
- this.writable = false;
-
- function close() {
- fs.close(self.fd, function(err) {
- if (err) {
- self.emit('error', err);
- } else {
- self.emit('close');
- }
- });
- }
- if (this.fd === null) {
- this.addListener('open', close);
- } else {
- close();
- }
-};
+WriteStream.prototype.destroy = ReadStream.prototype.destroy;
+WriteStream.prototype.close = ReadStream.prototype.close;
// There is no shutdown() for files.
WriteStream.prototype.destroySoon = WriteStream.prototype.end;