From: isaacs Date: Mon, 4 Mar 2013 03:14:06 +0000 (-0800) Subject: stream: _write takes an encoding argument X-Git-Tag: v0.9.12~23 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=426b4c625802c7b6913fa09237aa9745bf3ae84a;p=platform%2Fupstream%2Fnodejs.git stream: _write takes an encoding argument This vastly reduces the overhead of decodeStrings:false streams, such as net and http. --- diff --git a/doc/api/stream.markdown b/doc/api/stream.markdown index 2b192d6..c419266 100644 --- a/doc/api/stream.markdown +++ b/doc/api/stream.markdown @@ -436,8 +436,8 @@ Resumes the incoming `'data'` events after a `pause()`. A `Writable` Stream has the following methods, members, and events. Note that `stream.Writable` is an abstract class designed to be -extended with an underlying implementation of the `_write(chunk, cb)` -method. (See below.) +extended with an underlying implementation of the +`_write(chunk, encoding, cb)` method. (See below.) ### new stream.Writable([options]) @@ -451,10 +451,16 @@ In classes that extend the Writable class, make sure to call the constructor so that the buffering settings can be properly initialized. -### writable.\_write(chunk, callback) +### writable.\_write(chunk, encoding, callback) -* `chunk` {Buffer | Array} The data to be written -* `callback` {Function} Called with an error, or null when finished +* `chunk` {Buffer | String} The chunk to be written. Will always + be a buffer unless the `decodeStrings` option was set to `false`. +* `encoding` {String} If the chunk is a string, then this is the + encoding type. Ignore chunk is a buffer. Note that chunk will + **always** be a buffer unless the `decodeStrings` option is + explicitly set to `false`. +* `callback` {Function} Call this function (optionally with an error + argument) when you are done processing the supplied chunk. All Writable stream implementations must provide a `_write` method to send data to the underlying resource. @@ -467,9 +473,12 @@ Call the callback using the standard `callback(error)` pattern to signal that the write completed successfully or with an error. If the `decodeStrings` flag is set in the constructor options, then -`chunk` will be an array rather than a Buffer. This is to support +`chunk` may be a string rather than a Buffer, and `encoding` will +indicate the sort of string that it is. This is to support implementations that have an optimized handling for certain string -data encodings. +data encodings. If you do not explicitly set the `decodeStrings` +option to `false`, then you can safely ignore the `encoding` argument, +and assume that `chunk` will always be a Buffer. This method is prefixed with an underscore because it is internal to the class that defines it, and should not be called directly by user @@ -543,13 +552,13 @@ TCP socket connection. Note that `stream.Duplex` is an abstract class designed to be extended with an underlying implementation of the `_read(size)` -and `_write(chunk, callback)` methods as you would with a Readable or +and `_write(chunk, encoding, callback)` methods as you would with a Readable or Writable stream class. Since JavaScript doesn't have multiple prototypal inheritance, this class prototypally inherits from Readable, and then parasitically from Writable. It is thus up to the user to implement both the lowlevel -`_read(n)` method as well as the lowlevel `_write(chunk,cb)` method +`_read(n)` method as well as the lowlevel `_write(chunk, encoding, cb)` method on extension duplex classes. ### new stream.Duplex(options) @@ -589,9 +598,12 @@ In classes that extend the Transform class, make sure to call the constructor so that the buffering settings can be properly initialized. -### transform.\_transform(chunk, callback) +### transform.\_transform(chunk, encoding, callback) -* `chunk` {Buffer} The chunk to be transformed. +* `chunk` {Buffer | String} The chunk to be transformed. Will always + be a buffer unless the `decodeStrings` option was set to `false`. +* `encoding` {String} If the chunk is a string, then this is the + encoding type. (Ignore if `decodeStrings` chunk is a buffer.) * `callback` {Function} Call this function (optionally with an error argument) when you are done processing the supplied chunk. @@ -671,7 +683,7 @@ function SimpleProtocol(options) { SimpleProtocol.prototype = Object.create( Transform.prototype, { constructor: { value: SimpleProtocol }}); -SimpleProtocol.prototype._transform = function(chunk, done) { +SimpleProtocol.prototype._transform = function(chunk, encoding, done) { if (!this._inBody) { // check if the chunk has a \n\n var split = -1; diff --git a/lib/_stream_passthrough.js b/lib/_stream_passthrough.js index 557d6de..a5e9864 100644 --- a/lib/_stream_passthrough.js +++ b/lib/_stream_passthrough.js @@ -36,6 +36,6 @@ function PassThrough(options) { Transform.call(this, options); } -PassThrough.prototype._transform = function(chunk, cb) { +PassThrough.prototype._transform = function(chunk, encoding, cb) { cb(null, chunk); }; diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js index 222b139..013bebd 100644 --- a/lib/_stream_transform.js +++ b/lib/_stream_transform.js @@ -155,10 +155,11 @@ Transform.prototype._transform = function(chunk, output, cb) { throw new Error('not implemented'); }; -Transform.prototype._write = function(chunk, cb) { +Transform.prototype._write = function(chunk, encoding, cb) { var ts = this._transformState; ts.writecb = cb; ts.writechunk = chunk; + ts.writeencoding = encoding; if (!ts.transforming) { var rs = this._readableState; if (ts.needTransform || @@ -176,7 +177,7 @@ Transform.prototype._read = function(n) { if (ts.writechunk && ts.writecb && !ts.transforming) { ts.transforming = true; - this._transform(ts.writechunk, ts.afterTransform); + this._transform(ts.writechunk, ts.writeencoding, ts.afterTransform); } else { // mark that we need a transform, so that any data that comes in // will get processed, now that we've asked for it. diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 2dff2d8..57926ad 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -146,15 +146,6 @@ function validChunk(stream, state, chunk, cb) { return valid; } -function decodeChunk(state, chunk, encoding) { - if (!state.objectMode && - state.decodeStrings !== false && - typeof chunk === 'string') { - chunk = new Buffer(chunk, encoding); - } - return chunk; -} - Writable.prototype.write = function(chunk, encoding, cb) { var state = this._writableState; var ret = false; @@ -177,6 +168,15 @@ Writable.prototype.write = function(chunk, encoding, cb) { return ret; }; +function decodeChunk(state, chunk, encoding) { + if (!state.objectMode && + state.decodeStrings !== false && + typeof chunk === 'string') { + chunk = new Buffer(chunk, encoding); + } + return chunk; +} + // if we're already writing something, then just put this // in the queue, and wait our turn. Otherwise, call _write // If we return false, then we need a drain event, so set that flag. @@ -184,17 +184,13 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) { chunk = decodeChunk(state, chunk, encoding); var len = state.objectMode ? 1 : chunk.length; - // XXX Remove. _write() should take an encoding. - if (state.decodeStrings === false) - chunk = [chunk, encoding]; - state.length += len; var ret = state.length < state.highWaterMark; state.needDrain = !ret; if (state.writing) - state.buffer.push([chunk, cb]); // XXX [chunk,encoding,cb] + state.buffer.push([chunk, encoding, cb]); else doWrite(stream, state, len, chunk, encoding, cb); @@ -206,8 +202,7 @@ function doWrite(stream, state, len, chunk, encoding, cb) { state.writecb = cb; state.writing = true; state.sync = true; - // XXX stream._write(chunk, encoding, state.onwrite) - stream._write(chunk, state.onwrite); + stream._write(chunk, encoding, state.onwrite); state.sync = false; } @@ -271,21 +266,12 @@ function onwriteDrain(stream, state) { function clearBuffer(stream, state) { state.bufferProcessing = true; - // XXX buffer entry should be [chunk, encoding, cb] for (var c = 0; c < state.buffer.length; c++) { - var chunkCb = state.buffer[c]; - var chunk = chunkCb[0]; - var cb = chunkCb[1]; - var encoding = ''; - var len; - - if (state.objectMode) - len = 1; - else if (false === state.decodeStrings) { - len = chunk[0].length; - encoding = chunk[1]; - } else - len = chunk.length; + var entry = state.buffer[c]; + var chunk = entry[0]; + var encoding = entry[1]; + var cb = entry[2]; + var len = state.objectMode ? 1 : chunk.length; doWrite(stream, state, len, chunk, encoding, cb); @@ -306,10 +292,8 @@ function clearBuffer(stream, state) { state.buffer.length = 0; } -Writable.prototype._write = function(chunk, cb) { - process.nextTick(function() { - cb(new Error('not implemented')); - }); +Writable.prototype._write = function(chunk, encoding, cb) { + cb(new Error('not implemented')); }; Writable.prototype.end = function(chunk, encoding, cb) { diff --git a/lib/crypto.js b/lib/crypto.js index 500e14d..01d4b71 100644 --- a/lib/crypto.js +++ b/lib/crypto.js @@ -160,8 +160,8 @@ function Hash(algorithm, options) { util.inherits(Hash, stream.Transform); -Hash.prototype._transform = function(chunk, callback) { - this._binding.update(chunk); +Hash.prototype._transform = function(chunk, encoding, callback) { + this._binding.update(chunk, encoding); callback(); }; @@ -226,8 +226,8 @@ function Cipher(cipher, password, options) { util.inherits(Cipher, stream.Transform); -Cipher.prototype._transform = function(chunk, callback) { - this.push(this._binding.update(chunk)); +Cipher.prototype._transform = function(chunk, encoding, callback) { + this.push(this._binding.update(chunk, encoding)); callback(); }; @@ -351,8 +351,8 @@ function Sign(algorithm, options) { util.inherits(Sign, stream.Writable); -Sign.prototype._write = function(chunk, callback) { - this._binding.update(chunk); +Sign.prototype._write = function(chunk, encoding, callback) { + this._binding.update(chunk, encoding); callback(); }; diff --git a/lib/fs.js b/lib/fs.js index d467c5e..39a34ab 100644 --- a/lib/fs.js +++ b/lib/fs.js @@ -1650,12 +1650,14 @@ WriteStream.prototype.open = function() { }; -WriteStream.prototype._write = function(data, cb) { +WriteStream.prototype._write = function(data, encoding, cb) { if (!Buffer.isBuffer(data)) return this.emit('error', new Error('Invalid data')); if (typeof this.fd !== 'number') - return this.once('open', this._write.bind(this, data, cb)); + return this.once('open', function() { + this._write(data, encoding, cb); + }); var self = this; fs.write(this.fd, data, 0, data.length, this.pos, function(er, bytes) { diff --git a/lib/net.js b/lib/net.js index 35223c3..4821665 100644 --- a/lib/net.js +++ b/lib/net.js @@ -161,7 +161,8 @@ function Socket(options) { initSocketHandle(this); - this._pendingWrite = null; + this._pendingData = null; + this._pendingEncoding = ''; // handle strings directly this._writableState.decodeStrings = false; @@ -583,22 +584,20 @@ Socket.prototype.write = function(chunk, encoding, cb) { }; -Socket.prototype._write = function(dataEncoding, cb) { - // assert(Array.isArray(dataEncoding)); - var data = dataEncoding[0]; - var encoding = dataEncoding[1] || 'utf8'; - +Socket.prototype._write = function(data, encoding, cb) { // If we are still connecting, then buffer this for later. // The Writable logic will buffer up any more writes while // waiting for this one to be done. if (this._connecting) { - this._pendingWrite = dataEncoding; + this._pendingData = data; + this._pendingEncoding = encoding; this.once('connect', function() { - this._write(dataEncoding, cb); + this._write(data, encoding, cb); }); return; } - this._pendingWrite = null; + this._pendingData = null; + this._pendingEncoding = ''; timers.active(this); @@ -651,15 +650,16 @@ function createWriteReq(handle, data, encoding) { Socket.prototype.__defineGetter__('bytesWritten', function() { var bytes = this._bytesDispatched, state = this._writableState, - pending = this._pendingWrite; + data = this._pendingData, + encoding = this._pendingEncoding; state.buffer.forEach(function(el) { el = el[0]; bytes += Buffer.byteLength(el[0], el[1]); }); - if (pending) - bytes += Buffer.byteLength(pending[0], pending[1]); + if (data) + bytes += Buffer.byteLength(data, encoding); return bytes; }); diff --git a/lib/tls.js b/lib/tls.js index 86ace15..5157614 100644 --- a/lib/tls.js +++ b/lib/tls.js @@ -239,6 +239,7 @@ function CryptoStream(pair, options) { this.pair = pair; this._pending = null; + this._pendingEncoding = ''; this._pendingCallback = null; this._doneFlag = false; this._resumingSession = false; @@ -300,7 +301,7 @@ function onCryptoStreamEnd() { } -CryptoStream.prototype._write = function write(data, cb) { +CryptoStream.prototype._write = function write(data, encoding, cb) { assert(this._pending === null); // Black-hole data @@ -361,6 +362,7 @@ CryptoStream.prototype._write = function write(data, cb) { // No write has happened this._pending = data; + this._pendingEncoding = encoding; this._pendingCallback = cb; if (this === this.pair.cleartext) { @@ -373,11 +375,13 @@ CryptoStream.prototype._write = function write(data, cb) { CryptoStream.prototype._writePending = function writePending() { var data = this._pending, + encoding = this._pendingEncoding, cb = this._pendingCallback; this._pending = null; + this._pendingEncoding = ''; this._pendingCallback = null; - this._write(data, cb); + this._write(data, encoding, cb); }; diff --git a/lib/zlib.js b/lib/zlib.js index d3aa858..dc0aeca 100644 --- a/lib/zlib.js +++ b/lib/zlib.js @@ -309,7 +309,7 @@ Zlib.prototype.reset = function reset() { }; Zlib.prototype._flush = function(callback) { - this._transform(null, callback); + this._transform(null, '', callback); }; Zlib.prototype.flush = function(callback) { @@ -343,7 +343,7 @@ Zlib.prototype.close = function(callback) { }); }; -Zlib.prototype._transform = function(chunk, cb) { +Zlib.prototype._transform = function(chunk, encoding, cb) { var flushFlag; var ws = this._writableState; var ending = ws.ending || ws.ended; diff --git a/test/simple/test-stream2-finish-pipe.js b/test/simple/test-stream2-finish-pipe.js index bcb57a7..39b274f 100644 --- a/test/simple/test-stream2-finish-pipe.js +++ b/test/simple/test-stream2-finish-pipe.js @@ -29,7 +29,7 @@ r._read = function(size) { }; var w = new stream.Writable(); -w._write = function(data, cb) { +w._write = function(data, encoding, cb) { cb(null); }; diff --git a/test/simple/test-stream2-objects.js b/test/simple/test-stream2-objects.js index 8939ad7..ba626cb 100644 --- a/test/simple/test-stream2-objects.js +++ b/test/simple/test-stream2-objects.js @@ -261,7 +261,7 @@ test('high watermark push', function(t) { test('can write objects to stream', function(t) { var w = new Writable({ objectMode: true }); - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { assert.deepEqual(chunk, { foo: 'bar' }); cb(); }; @@ -278,7 +278,7 @@ test('can write multiple objects to stream', function(t) { var w = new Writable({ objectMode: true }); var list = []; - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { list.push(chunk); cb(); }; @@ -303,7 +303,7 @@ test('can write strings as objects', function(t) { }); var list = []; - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { list.push(chunk); process.nextTick(cb); }; @@ -328,7 +328,7 @@ test('buffers finish until cb is called', function(t) { }); var called = false; - w._write = function(chunk, cb) { + w._write = function(chunk, encoding, cb) { assert.equal(chunk, 'foo'); process.nextTick(function() { diff --git a/test/simple/test-stream2-pipe-error-handling.js b/test/simple/test-stream2-pipe-error-handling.js index 82c9a79..cf7531c 100644 --- a/test/simple/test-stream2-pipe-error-handling.js +++ b/test/simple/test-stream2-pipe-error-handling.js @@ -40,7 +40,7 @@ var stream = require('stream'); }; var dest = new stream.Writable(); - dest._write = function(chunk, cb) { + dest._write = function(chunk, encoding, cb) { cb(); }; @@ -80,7 +80,7 @@ var stream = require('stream'); }; var dest = new stream.Writable(); - dest._write = function(chunk, cb) { + dest._write = function(chunk, encoding, cb) { cb(); }; diff --git a/test/simple/test-stream2-push.js b/test/simple/test-stream2-push.js index 29b438d..b63edc3 100644 --- a/test/simple/test-stream2-push.js +++ b/test/simple/test-stream2-push.js @@ -90,9 +90,9 @@ var expectWritten = 'asdfgasdfgasdfgasdfg', 'asdfgasdfgasdfgasdfg' ]; -writer._write = function(chunk, cb) { - console.error('WRITE %s', chunk[0]); - written.push(chunk[0]); +writer._write = function(chunk, encoding, cb) { + console.error('WRITE %s', chunk); + written.push(chunk); process.nextTick(cb); }; diff --git a/test/simple/test-stream2-stderr-sync.js b/test/simple/test-stream2-stderr-sync.js index 992c79d..2b83617 100644 --- a/test/simple/test-stream2-stderr-sync.js +++ b/test/simple/test-stream2-stderr-sync.js @@ -61,7 +61,7 @@ function child0() { Writable.call(this, opts); } - W.prototype._write = function(chunk, cb) { + W.prototype._write = function(chunk, encoding, cb) { var req = handle.writeUtf8String(chunk.toString() + '\n'); // here's the problem. // it needs to tell the Writable machinery that it's ok to write diff --git a/test/simple/test-stream2-transform.js b/test/simple/test-stream2-transform.js index baef18d..a329dee 100644 --- a/test/simple/test-stream2-transform.js +++ b/test/simple/test-stream2-transform.js @@ -67,7 +67,7 @@ test('writable side consumption', function(t) { }); var transformed = 0; - tx._transform = function(chunk, cb) { + tx._transform = function(chunk, encoding, cb) { transformed += chunk.length; tx.push(chunk); cb(); @@ -106,7 +106,7 @@ test('passthrough', function(t) { test('simple transform', function(t) { var pt = new Transform; - pt._transform = function(c, cb) { + pt._transform = function(c, e, cb) { var ret = new Buffer(c.length); ret.fill('x'); pt.push(ret); @@ -128,7 +128,7 @@ test('simple transform', function(t) { test('async passthrough', function(t) { var pt = new Transform; - pt._transform = function(chunk, cb) { + pt._transform = function(chunk, encoding, cb) { setTimeout(function() { pt.push(chunk); cb(); @@ -154,7 +154,7 @@ test('assymetric transform (expand)', function(t) { var pt = new Transform; // emit each chunk 2 times. - pt._transform = function(chunk, cb) { + pt._transform = function(chunk, encoding, cb) { setTimeout(function() { pt.push(chunk); setTimeout(function() { @@ -189,7 +189,7 @@ test('assymetric transform (compress)', function(t) { // or whatever's left. pt.state = ''; - pt._transform = function(chunk, cb) { + pt._transform = function(chunk, encoding, cb) { if (!chunk) chunk = ''; var s = chunk.toString(); @@ -359,7 +359,7 @@ test('passthrough facaded', function(t) { test('object transform (json parse)', function(t) { console.error('json parse stream'); var jp = new Transform({ objectMode: true }); - jp._transform = function(data, cb) { + jp._transform = function(data, encoding, cb) { try { jp.push(JSON.parse(data)); cb(); @@ -399,7 +399,7 @@ test('object transform (json parse)', function(t) { test('object transform (json stringify)', function(t) { console.error('json parse stream'); var js = new Transform({ objectMode: true }); - js._transform = function(data, cb) { + js._transform = function(data, encoding, cb) { try { js.push(JSON.stringify(data)); cb(); diff --git a/test/simple/test-stream2-writable.js b/test/simple/test-stream2-writable.js index 5376602..1c1bb97 100644 --- a/test/simple/test-stream2-writable.js +++ b/test/simple/test-stream2-writable.js @@ -33,7 +33,7 @@ function TestWriter() { this.written = 0; } -TestWriter.prototype._write = function(chunk, cb) { +TestWriter.prototype._write = function(chunk, encoding, cb) { // simulate a small unpredictable latency setTimeout(function() { this.buffer.push(chunk.toString()); @@ -186,11 +186,10 @@ test('write no bufferize', function(t) { decodeStrings: false }); - tw._write = function(chunk, cb) { - assert(Array.isArray(chunk)); - assert(typeof chunk[0] === 'string'); - chunk = new Buffer(chunk[0], chunk[1]); - return TestWriter.prototype._write.call(this, chunk, cb); + tw._write = function(chunk, encoding, cb) { + assert(typeof chunk === 'string'); + chunk = new Buffer(chunk, encoding); + return TestWriter.prototype._write.call(this, chunk, encoding, cb); }; var encodings = @@ -279,7 +278,7 @@ test('end callback after .write() call', function (t) { test('encoding should be ignored for buffers', function(t) { var tw = new W(); var hex = '018b5e9a8f6236ffe30e31baf80d2cf6eb'; - tw._write = function(chunk, cb) { + tw._write = function(chunk, encoding, cb) { t.equal(chunk.toString('hex'), hex); t.end(); };