fs.ReadStream should emit Buffers
authorRyan Dahl <ry@tinyclouds.org>
Mon, 24 May 2010 22:47:40 +0000 (15:47 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Mon, 24 May 2010 22:47:42 +0000 (15:47 -0700)
And do proper utf8 encoding.

lib/fs.js
test/simple/test-file-read-stream.js

index a9d5893..ad6b977 100644 (file)
--- a/lib/fs.js
+++ b/lib/fs.js
@@ -1,10 +1,13 @@
-var sys = require('sys'),
-    events = require('events'),
-    Buffer = require('buffer').Buffer;
+var sys = require('sys');
+var events = require('events');
+var Buffer = require('buffer').Buffer;
 
 var binding = process.binding('fs');
 var fs = exports;
 
+var kMinPoolSpace = 128;
+var kPoolSize = 40*1024;
+
 fs.Stats = binding.Stats;
 
 fs.Stats.prototype._checkModeProperty = function (property) {
@@ -565,8 +568,16 @@ fs.realpath = function (path, callback) {
     }
   }
   next();
+};
+
+var pool;
+function allocNewPool () {
+  pool = new Buffer(kPoolSize);
+  pool.used = 0;
 }
 
+
+
 fs.createReadStream = function(path, options) {
   return new ReadStream(path, options);
 };
@@ -580,7 +591,6 @@ var ReadStream = fs.ReadStream = function(path, options) {
   this.paused = false;
 
   this.flags = 'r';
-  this.encoding = 'binary';
   this.mode = 0666;
   this.bufferSize = 4 * 1024;
 
@@ -614,19 +624,40 @@ sys.inherits(ReadStream, events.EventEmitter);
 
 fs.FileReadStream = fs.ReadStream; // support the legacy name
 
-ReadStream.prototype.setEncoding = function(encoding) {
-  this.encoding = encoding;
+ReadStream.prototype.setEncoding = function (encoding) {
+  var Utf8Decoder = require("utf8decoder").Utf8Decoder; // lazy load
+  var self = this;
+  this._encoding = enc.toLowerCase();
+  if (this._encoding == 'utf-8' || this._encoding == 'utf8') {
+    this._decoder = new Utf8Decoder();
+    this._decoder.onString = function(str) {
+      self.emit('data', str);
+    };
+  } else if (this._decoder) {
+    delete this._decoder;
+  }
 };
 
+
 ReadStream.prototype._read = function () {
   var self = this;
   if (!self.readable || self.paused) return;
 
-  fs.read(self.fd,
-          self.bufferSize,
-          undefined,
-          self.encoding,
-          function(err, data, bytesRead) {
+  if (!pool || pool.length - pool.used < kMinPoolSpace) {
+    // discard the old pool. Can't add to the free list because
+    // users might have refernces to slices on it.
+    pool = null;
+    allocNewPool();
+  }
+
+  // Grab another reference to the pool in the case that while we're in the
+  // thread pool another read() finishes up the pool, and allocates a new
+  // one.
+  var thisPool = pool;
+  var toRead = Math.min(pool.length - pool.used, this.bufferSize);
+  var start = pool.used;
+
+  function afterRead (err, bytesRead) {
     if (err) {
       self.emit('error', err);
       self.readable = false;
@@ -639,20 +670,39 @@ ReadStream.prototype._read = function () {
       return;
     }
 
+    var b = thisPool.slice(start, start+bytesRead);
+
+    // Possible optimizition here?
+    // Reclaim some bytes if bytesRead < toRead?
+    // Would need to ensure that pool === thisPool.
+
     // do not emit events if the stream is paused
     if (self.paused) {
-      self.buffer = data;
+      self.buffer = b;
       return;
     }
 
     // do not emit events anymore after we declared the stream unreadable
-    if (!self.readable) {
-      return;
-    }
+    if (!self.readable) return;
 
-    self.emit('data', data);
+    self._emitData(b);
     self._read();
-  });
+  }
+
+  fs.read(self.fd, pool, pool.used, toRead, undefined, afterRead);
+  pool.used += toRead;
+};
+
+
+ReadStream.prototype._emitData = function (d) {
+  if (!this._encoding) {
+    this.emit('data', d);
+  } else if (this._decoder) {
+    this._decoder.write(d);
+  } else {
+    var string = d.toString(this._encoding, 0, d.length);
+    this.emit('data', string);
+  }
 };
 
 
@@ -664,7 +714,7 @@ ReadStream.prototype.forceClose = function (cb) {
     sys.error(readStreamForceCloseWarning);
   }
   return this.destroy(cb);
-}
+};
 
 
 ReadStream.prototype.destroy = function (cb) {
@@ -674,16 +724,12 @@ ReadStream.prototype.destroy = function (cb) {
   function close() {
     fs.close(self.fd, function(err) {
       if (err) {
-        if (cb) {
-          cb(err);
-        }
+        if (cb) cb(err);
         self.emit('error', err);
         return;
       }
 
-      if (cb) {
-        cb(null);
-      }
+      if (cb) cb(null);
       self.emit('close');
     });
   }
@@ -705,7 +751,7 @@ ReadStream.prototype.resume = function() {
   this.paused = false;
 
   if (this.buffer) {
-    this.emit('data', this.buffer);
+    this._emitData(this.buffer);
     this.buffer = null;
   }
 
@@ -858,7 +904,7 @@ WriteStream.prototype.forceClose = function (cb) {
     sys.error(writeStreamForceCloseWarning);
   }
   return this.destroy(cb);
-}
+};
 
 
 WriteStream.prototype.forceClose = function (cb) {
index a7fa8b6..c615030 100644 (file)
@@ -1,54 +1,65 @@
 require('../common');
 
-var
-  path = require('path'),
-  fs = require('fs'),
-  fn = path.join(fixturesDir, 'test_ca.pem'),
-  file = fs.createReadStream(fn),
-
-  callbacks = {
-    open: -1,
-    end: -1,
-    close: -1,
-    destroy: -1
-  },
-
-  paused = false,
-
-  fileContent = '';
-
-file
-  .addListener('open', function(fd) {
-    callbacks.open++;
-    assert.equal('number', typeof fd);
-    assert.ok(file.readable);
-  })
-  .addListener('error', function(err) {
-    throw err;
-  })
-  .addListener('data', function(data) {
-    assert.ok(!paused);
-    fileContent += data;
-    
-    paused = true;
-    file.pause();
-    assert.ok(file.paused);
-
-    setTimeout(function() {
-      paused = false;
-      file.resume();
-      assert.ok(!file.paused);
-    }, 10);
-  })
-  .addListener('end', function(chunk) {
-    callbacks.end++;
-  })
-  .addListener('close', function() {
-    callbacks.close++;
-    assert.ok(!file.readable);
-
-    assert.equal(fs.readFileSync(fn), fileContent);
-  });
+// TODO Improved this test. test_ca.pem is too small. A proper test would
+// great a large utf8 (with multibyte chars) file and stream it in,
+// performing sanity checks throughout.
+
+Buffer = require('buffer').Buffer;
+path = require('path');
+fs = require('fs');
+fn = path.join(fixturesDir, 'test_ca.pem');
+
+file = fs.createReadStream(fn);
+
+callbacks = {
+  open: -1,
+  end: -1,
+  data: -1,
+  close: -1,
+  destroy: -1
+};
+
+paused = false;
+
+fileContent = '';
+
+file.addListener('open', function(fd) {
+  callbacks.open++;
+  assert.equal('number', typeof fd);
+  assert.ok(file.readable);
+});
+
+file.addListener('error', function(err) {
+  throw err;
+});
+
+file.addListener('data', function(data) {
+  callbacks.data++;
+  assert.ok(data instanceof Buffer);
+  assert.ok(!paused);
+  fileContent += data;
+  
+  paused = true;
+  file.pause();
+  assert.ok(file.paused);
+
+  setTimeout(function() {
+    paused = false;
+    file.resume();
+    assert.ok(!file.paused);
+  }, 10);
+});
+
+file.addListener('end', function(chunk) {
+  callbacks.end++;
+});
+
+file.addListener('close', function() {
+  callbacks.close++;
+  assert.ok(!file.readable);
+
+  assert.equal(fs.readFileSync(fn), fileContent);
+});
 
 var file2 = fs.createReadStream(fn);
 file2.destroy(function(err) {