From 264a67aed2f4ac65a9b3f6fd3c2622793e83e6ad Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 9 Mar 2010 11:59:42 -0800 Subject: [PATCH] Update net.js for new stream API --- lib/net.js | 87 ++++++++++++++++++++------------------ test/simple/test-net-fd-passing.js | 2 +- test/simple/test-net-pingpong.js | 8 ++-- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/lib/net.js b/lib/net.js index 8e9abd9..656f824 100644 --- a/lib/net.js +++ b/lib/net.js @@ -8,6 +8,7 @@ function debug (x) { } +var Buffer = process.Buffer; var IOWatcher = process.IOWatcher; var assert = process.assert; var socket = process.socket; @@ -61,13 +62,13 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { var nb = 0; var buffers = new FreeList("buffer", 100, function (l) { - return new process.Buffer(500); + return new Buffer(500); }); // Allocated on demand. var recvBuffer = null; function allocRecvBuffer () { - recvBuffer = new process.Buffer(40*1024); + recvBuffer = new Buffer(40*1024); recvBuffer.used = 0; } @@ -138,19 +139,20 @@ function initSocket (self) { }; self.readable = false; - self.sendQueue = []; // queue of buffers that need to be written to socket + self._writeQueue = []; // queue of buffers that need to be written to socket // XXX use link list? - self.sendQueueSize = 0; // in bytes, not to be confused with sendQueue.length! - self.sendMessageQueueSize = 0; // number of messages remaining to be sent + self._writeQueueSize = 0; // in bytes, not to be confused with _writeQueue.length! + self._writeMessageQueueSize = 0; // number of messages remaining to be sent + self._doFlush = function () { // Socket becomes writeable on connect() but don't flush if there's // nothing actually to write - if ((self.sendQueueSize == 0) && (self.sendMessageQueueSize == 0)) { + if ((self._writeQueueSize == 0) && (self._writeMessageQueueSize == 0)) { return; } if (self.flush()) { - assert(self.sendQueueSize == 0); - assert(self.sendMessageQueueSize == 0); + assert(self._writeQueueSize == 0); + assert(self._writeMessageQueueSize == 0); if (self._events && self._events['drain']) self.emit("drain"); if (self.ondrain) self.ondrain(); // Optimization @@ -195,19 +197,19 @@ Socket.prototype._allocateSendBuffer = function () { b.used = 0; b.sent = 0; b.isMsg = false; - this.sendQueue.push(b); + this._writeQueue.push(b); return b; }; -Socket.prototype._sendString = function (data, encoding) { +Socket.prototype._writeString = function (data, encoding) { var self = this; if (!self.writable) throw new Error('Socket is not writable'); var buffer; - if (self.sendQueue.length == 0) { + if (self._writeQueue.length == 0) { buffer = self._allocateSendBuffer(); } else { - buffer = self._sendQueueLast(); + buffer = self.__writeQueueLast(); if (buffer.used == buffer.length) { buffer = self._allocateSendBuffer(); } @@ -230,7 +232,7 @@ Socket.prototype._sendString = function (data, encoding) { charsWritten = buffer.utf8Write(data, buffer.used, buffer.length - buffer.used); - bytesWritten = process.Buffer.utf8ByteLength(data.slice(0, charsWritten)); + bytesWritten = Buffer.utf8ByteLength(data.slice(0, charsWritten)); } else { // ascii buffer.isFd = false; @@ -242,9 +244,9 @@ Socket.prototype._sendString = function (data, encoding) { buffer.used += bytesWritten; if (buffer.isFd) { - self.sendMessageQueueSize += 1; + self._writeMessageQueueSize += 1; } else { - self.sendQueueSize += bytesWritten; + self._writeQueueSize += bytesWritten; } debug('charsWritten ' + charsWritten); @@ -252,40 +254,45 @@ Socket.prototype._sendString = function (data, encoding) { // If we didn't finish, then recurse with the rest of the string. if (charsWritten < data.length) { - debug('recursive send'); - self._sendString(data.slice(charsWritten), encoding); + debug('recursive write'); + self._writeString(data.slice(charsWritten), encoding); } }; -Socket.prototype._sendQueueLast = function () { - return this.sendQueue.length > 0 ? this.sendQueue[this.sendQueue.length-1] +Socket.prototype.__writeQueueLast = function () { + return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1] : null; }; +Socket.prototype.send = function () { + throw new Error('send renamed to write'); +}; + + // Returns true if all the data was flushed to socket. Returns false if // something was queued. If data was queued, then the "drain" event will // signal when it has been finally flushed to socket. -Socket.prototype.send = function (data, encoding) { +Socket.prototype.write = function (data, encoding) { var self = this; if (!self.writable) throw new Error('Socket is not writable'); - if (self._sendQueueLast() == END_OF_FILE) { + if (self.__writeQueueLast() == END_OF_FILE) { throw new Error('socket.close() called already; cannot write.'); } if (typeof(data) == 'string') { - self._sendString(data, encoding); + self._writeString(data, encoding); } else { - // data is a process.Buffer - // walk through the sendQueue, find the first empty buffer + // data is a Buffer + // walk through the _writeQueue, find the first empty buffer //var inserted = false; data.sent = 0; data.used = data.length; - self.sendQueue.push(data); - self.sendQueueSize += data.used; + self._writeQueue.push(data); + self._writeQueueSize += data.used; } return this.flush(); }; @@ -296,7 +303,7 @@ Socket.prototype.sendFD = function(socketToPass) { if (!self.writable) throw new Error('Socket is not writable'); - if (self._sendQueueLast() == END_OF_FILE) { + if (self.__writeQueueLast() == END_OF_FILE) { throw new Error('socket.close() called already; cannot write.'); } @@ -308,7 +315,7 @@ Socket.prototype.sendFD = function(socketToPass) { throw new Error('Provided arg is not a socket'); } - return self.send(socketToPass.fd.toString(), "fd"); + return self.write(socketToPass.fd.toString(), "fd"); }; @@ -318,10 +325,10 @@ Socket.prototype.flush = function () { var self = this; var bytesWritten; - while (self.sendQueue.length) { + while (self._writeQueue.length) { if (!self.writable) throw new Error('Socket is not writable'); - var b = self.sendQueue[0]; + var b = self._writeQueue[0]; if (b == END_OF_FILE) { self._shutdown(); @@ -330,7 +337,7 @@ Socket.prototype.flush = function () { if (b.sent == b.used) { // shift! - self.sendQueue.shift(); + self._writeQueue.shift(); buffers.free(b); continue; } @@ -340,7 +347,7 @@ Socket.prototype.flush = function () { try { if (b.isFd) { fdToSend = parseInt(b.asciiSlice(b.sent, b.used - b.sent)); - bytesWritten = sendFD(self.fd, fdToSend); + bytesWritten = writeFD(self.fd, fdToSend); } else { bytesWritten = write(self.fd, b, @@ -355,16 +362,16 @@ Socket.prototype.flush = function () { if (bytesWritten === null) { // could not flush everything self._writeWatcher.start(); - assert(self.sendQueueSize > 0); + assert(self._writeQueueSize > 0); return false; } if (b.isFd) { b.sent = b.used; - self.sendMessageQueueSize -= 1; + self._writeMessageQueueSize -= 1; debug('sent fd: ' + fdToSend); } else { b.sent += bytesWritten; - self.sendQueueSize -= bytesWritten; + self._writeQueueSize -= bytesWritten; debug('bytes sent: ' + b.sent); } } @@ -446,9 +453,9 @@ Socket.prototype.forceClose = function (exception) { // recvBuffer is shared between sockets, so don't need to free it here. var b; - while (this.sendQueue.length) { - b = this.sendQueue.shift(); - if (b instanceof process.Buffer) buffers.free(b); + while (this._writeQueue.length) { + b = this._writeQueue.shift(); + if (b instanceof Buffer) buffers.free(b); } if (this._writeWatcher) { @@ -489,8 +496,8 @@ Socket.prototype._shutdown = function () { Socket.prototype.close = function () { if (this.writable) { - if (this._sendQueueLast() != END_OF_FILE) { - this.sendQueue.push(END_OF_FILE); + if (this.__writeQueueLast() != END_OF_FILE) { + this._writeQueue.push(END_OF_FILE); this.flush(); } } diff --git a/test/simple/test-net-fd-passing.js b/test/simple/test-net-fd-passing.js index 0e8620c..1cf577b 100644 --- a/test/simple/test-net-fd-passing.js +++ b/test/simple/test-net-fd-passing.js @@ -29,7 +29,7 @@ function fdPassingTest(path, port) { var client = net.createConnection(port); client.addListener("connect", function() { - client.send(message); + client.write(message); }); client.addListener("data", function(data) { diff --git a/test/simple/test-net-pingpong.js b/test/simple/test-net-pingpong.js index ded3153..560f943 100644 --- a/test/simple/test-net-pingpong.js +++ b/test/simple/test-net-pingpong.js @@ -25,7 +25,7 @@ function pingPongTest (port, host) { assert.equal(true, socket.readable); assert.equal(true, count <= N); if (/PING/.exec(data)) { - socket.send("PONG"); + socket.write("PONG"); } }); @@ -50,7 +50,7 @@ function pingPongTest (port, host) { client.addListener("connect", function () { assert.equal(true, client.readable); assert.equal(true, client.writable); - client.send("PING"); + client.write("PING"); }); client.addListener("data", function (data) { @@ -69,10 +69,10 @@ function pingPongTest (port, host) { } if (count < N) { - client.send("PING"); + client.write("PING"); } else { sent_final_ping = true; - client.send("PING"); + client.write("PING"); client.close(); } }); -- 2.7.4