-var sys = require('sys'),
- events = require('events'),
- Buffer = require('buffer').Buffer;
+var sys = require('sys');
+var events = require('events');
+var Buffer = require('buffer').Buffer;
var binding = process.binding('fs');
var fs = exports;
+var kMinPoolSpace = 128;
+var kPoolSize = 40*1024;
+
fs.Stats = binding.Stats;
fs.Stats.prototype._checkModeProperty = function (property) {
}
}
next();
+};
+
+var pool;
+function allocNewPool () {
+ pool = new Buffer(kPoolSize);
+ pool.used = 0;
}
+
+
fs.createReadStream = function(path, options) {
return new ReadStream(path, options);
};
this.paused = false;
this.flags = 'r';
- this.encoding = 'binary';
this.mode = 0666;
this.bufferSize = 4 * 1024;
fs.FileReadStream = fs.ReadStream; // support the legacy name
-ReadStream.prototype.setEncoding = function(encoding) {
- this.encoding = encoding;
+ReadStream.prototype.setEncoding = function (encoding) {
+ var Utf8Decoder = require("utf8decoder").Utf8Decoder; // lazy load
+ var self = this;
+ this._encoding = enc.toLowerCase();
+ if (this._encoding == 'utf-8' || this._encoding == 'utf8') {
+ this._decoder = new Utf8Decoder();
+ this._decoder.onString = function(str) {
+ self.emit('data', str);
+ };
+ } else if (this._decoder) {
+ delete this._decoder;
+ }
};
+
ReadStream.prototype._read = function () {
var self = this;
if (!self.readable || self.paused) return;
- fs.read(self.fd,
- self.bufferSize,
- undefined,
- self.encoding,
- function(err, data, bytesRead) {
+ if (!pool || pool.length - pool.used < kMinPoolSpace) {
+ // discard the old pool. Can't add to the free list because
+ // users might have refernces to slices on it.
+ pool = null;
+ allocNewPool();
+ }
+
+ // Grab another reference to the pool in the case that while we're in the
+ // thread pool another read() finishes up the pool, and allocates a new
+ // one.
+ var thisPool = pool;
+ var toRead = Math.min(pool.length - pool.used, this.bufferSize);
+ var start = pool.used;
+
+ function afterRead (err, bytesRead) {
if (err) {
self.emit('error', err);
self.readable = false;
return;
}
+ var b = thisPool.slice(start, start+bytesRead);
+
+ // Possible optimizition here?
+ // Reclaim some bytes if bytesRead < toRead?
+ // Would need to ensure that pool === thisPool.
+
// do not emit events if the stream is paused
if (self.paused) {
- self.buffer = data;
+ self.buffer = b;
return;
}
// do not emit events anymore after we declared the stream unreadable
- if (!self.readable) {
- return;
- }
+ if (!self.readable) return;
- self.emit('data', data);
+ self._emitData(b);
self._read();
- });
+ }
+
+ fs.read(self.fd, pool, pool.used, toRead, undefined, afterRead);
+ pool.used += toRead;
+};
+
+
+ReadStream.prototype._emitData = function (d) {
+ if (!this._encoding) {
+ this.emit('data', d);
+ } else if (this._decoder) {
+ this._decoder.write(d);
+ } else {
+ var string = d.toString(this._encoding, 0, d.length);
+ this.emit('data', string);
+ }
};
sys.error(readStreamForceCloseWarning);
}
return this.destroy(cb);
-}
+};
ReadStream.prototype.destroy = function (cb) {
function close() {
fs.close(self.fd, function(err) {
if (err) {
- if (cb) {
- cb(err);
- }
+ if (cb) cb(err);
self.emit('error', err);
return;
}
- if (cb) {
- cb(null);
- }
+ if (cb) cb(null);
self.emit('close');
});
}
this.paused = false;
if (this.buffer) {
- this.emit('data', this.buffer);
+ this._emitData(this.buffer);
this.buffer = null;
}
sys.error(writeStreamForceCloseWarning);
}
return this.destroy(cb);
-}
+};
WriteStream.prototype.forceClose = function (cb) {
require('../common');
-var
- path = require('path'),
- fs = require('fs'),
- fn = path.join(fixturesDir, 'test_ca.pem'),
- file = fs.createReadStream(fn),
-
- callbacks = {
- open: -1,
- end: -1,
- close: -1,
- destroy: -1
- },
-
- paused = false,
-
- fileContent = '';
-
-file
- .addListener('open', function(fd) {
- callbacks.open++;
- assert.equal('number', typeof fd);
- assert.ok(file.readable);
- })
- .addListener('error', function(err) {
- throw err;
- })
- .addListener('data', function(data) {
- assert.ok(!paused);
- fileContent += data;
-
- paused = true;
- file.pause();
- assert.ok(file.paused);
-
- setTimeout(function() {
- paused = false;
- file.resume();
- assert.ok(!file.paused);
- }, 10);
- })
- .addListener('end', function(chunk) {
- callbacks.end++;
- })
- .addListener('close', function() {
- callbacks.close++;
- assert.ok(!file.readable);
-
- assert.equal(fs.readFileSync(fn), fileContent);
- });
+// TODO Improved this test. test_ca.pem is too small. A proper test would
+// great a large utf8 (with multibyte chars) file and stream it in,
+// performing sanity checks throughout.
+
+Buffer = require('buffer').Buffer;
+path = require('path');
+fs = require('fs');
+fn = path.join(fixturesDir, 'test_ca.pem');
+
+file = fs.createReadStream(fn);
+
+callbacks = {
+ open: -1,
+ end: -1,
+ data: -1,
+ close: -1,
+ destroy: -1
+};
+
+paused = false;
+
+fileContent = '';
+
+file.addListener('open', function(fd) {
+ callbacks.open++;
+ assert.equal('number', typeof fd);
+ assert.ok(file.readable);
+});
+
+file.addListener('error', function(err) {
+ throw err;
+});
+
+file.addListener('data', function(data) {
+ callbacks.data++;
+ assert.ok(data instanceof Buffer);
+ assert.ok(!paused);
+ fileContent += data;
+
+ paused = true;
+ file.pause();
+ assert.ok(file.paused);
+
+ setTimeout(function() {
+ paused = false;
+ file.resume();
+ assert.ok(!file.paused);
+ }, 10);
+});
+
+file.addListener('end', function(chunk) {
+ callbacks.end++;
+});
+
+file.addListener('close', function() {
+ callbacks.close++;
+ assert.ok(!file.readable);
+
+ assert.equal(fs.readFileSync(fn), fileContent);
+});
var file2 = fs.createReadStream(fn);
file2.destroy(function(err) {