From f6e00759effd1d550657aa6ce0208ee04eda87bd Mon Sep 17 00:00:00 2001 From: =?utf8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 3 Mar 2010 12:39:41 +0100 Subject: [PATCH] Initial read stream implementation --- lib/fs.js | 90 ++++++++++++++++++++++++++++++++++++ test/simple/test-file-read-stream.js | 54 ++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 test/simple/test-file-read-stream.js diff --git a/lib/fs.js b/lib/fs.js index 24542ae..ac13b26 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -376,6 +376,96 @@ exports.realpath = function(path, callback) { }); } +exports.fileReadStream = function(path, options) { + return new FileReadStream(path, options); +}; + +var FileReadStream = exports.FileReadStream = function(path, options) { + this.path = path; + this.fd = null; + this.readable = true; + this.paused = false; + + this.flags = 'r'; + this.encoding = 'binary'; + this.mode = 0666; + this.bufferSize = 4 * 1024; + + process.mixin(this, options || {}); + + var + self = this, + buffer = []; + + function read() { + if (!self.readable || self.paused) { + return; + } + + fs.read(self.fd, self.bufferSize, undefined, self.encoding, function(err, data, bytesRead) { + if (bytesRead === 0) { + self.emit('end'); + self.close(); + return; + } + + // do not emit events if the stream is paused + if (self.paused) { + buffer.push(data); + return; + } + + self.emit('data', data); + read(); + }); + } + + fs.open(this.path, this.flags, this.mode, function(err, fd) { + if (err) { + self.emit('error', err); + return; + } + + self.fd = fd; + self.emit('open', fd); + read(); + }); + + this.close = function() { + this.readable = false; + fs.close(this.fd, function(err) { + if (err) { + self.emit('error', err); + return; + } + + self.emit('close'); + }); + }; + + this.pause = function() { + this.paused = true; + }; + + this.resume = function() { + this.paused = false; + + // emit any buffered read events before continuing + var data; + while (!this.paused) { + data = buffer.shift(); + if (data === undefined) { + break; + } + + self.emit('data', data); + } + + read(); + }; +}; +FileReadStream.prototype.__proto__ = process.EventEmitter.prototype; + exports.fileWriteStream = function(path, options) { return new FileWriteStream(path, options); }; diff --git a/test/simple/test-file-read-stream.js b/test/simple/test-file-read-stream.js new file mode 100644 index 0000000..3b358b5 --- /dev/null +++ b/test/simple/test-file-read-stream.js @@ -0,0 +1,54 @@ +process.mixin(require('../common')); + +var + fn = path.join(fixturesDir, 'multipart.js'), + file = fs.fileReadStream(fn), + + callbacks = { + open: -1, + end: -1, + close: -1 + }, + + paused = false, + + fileContent = ''; + +file + .addListener('open', function(fd) { + callbacks.open++; + assert.equal('number', typeof fd); + assert.ok(file.readable); + }) + .addListener('error', function(err) { + throw err; + }) + .addListener('data', function(data) { + assert.ok(!paused); + fileContent += data; + + paused = true; + file.pause(); + assert.ok(file.paused); + + setTimeout(function() { + paused = false; + file.resume(); + assert.ok(!file.paused); + }, 10); + }) + .addListener('end', function(chunk) { + callbacks.end++; + }) + .addListener('close', function() { + callbacks.close++; + assert.ok(!file.readable); + + assert.equal(fs.readFileSync(fn), fileContent); + }); + +process.addListener('exit', function() { + for (var k in callbacks) { + assert.equal(0, callbacks[k], k+' count off by '+callbacks[k]); + } +}); \ No newline at end of file -- 2.7.4