From dc0556c8cde5a3b81a1f7a85e29171e252dcc85f Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 17 Jun 2011 17:10:12 +0200 Subject: [PATCH] net_uv: Implement end(), destroySoon() --- lib/net_uv.js | 85 +++++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/lib/net_uv.js b/lib/net_uv.js index d637f0c..9f9441a 100644 --- a/lib/net_uv.js +++ b/lib/net_uv.js @@ -5,6 +5,11 @@ var util = require('util'); var assert = require('assert'); var TCP = process.binding('tcp_wrap').TCP; +/* Bit flags for socket._flags */ +var FLAG_GOT_EOF = 1 << 0; +var FLAG_SHUTDOWN = 1 << 1; +var FLAG_DESTROY_SOON = 1 << 2; + var debug; if (process.env.NODE_DEBUG && /net/.test(process.env.NODE_DEBUG)) { @@ -40,7 +45,11 @@ function Socket(options) { this._handle.socket = this; this._handle.onread = onread; + this.allowHalfOpen = options ? (options.allowHalfOpen || false) : false; + this._writeRequests = []; + + this._flags = 0; } util.inherits(Socket, stream.Stream); @@ -68,16 +77,12 @@ Object.defineProperty(Socket.prototype, 'readyState', { if (this._connecting) { return 'opening'; } else if (this.readable && this.writable) { - assert(typeof this.fd === 'number'); return 'open'; } else if (this.readable && !this.writable) { - assert(typeof this.fd === 'number'); return 'readOnly'; } else if (!this.readable && this.writable) { - assert(typeof this.fd === 'number'); return 'writeOnly'; } else { - assert(typeof this.fd !== 'number'); return 'closed'; } } @@ -101,13 +106,42 @@ Socket.prototype.resume = function() { }; -Socket.prototype.end = function() { - throw new Error("implement me"); +Socket.prototype.end = function(data, encoding) { + if (!this.writable) return; + this.writable = false; + + if (data) this.write(data, encoding); + DTRACE_NET_STREAM_END(this); + + if (this._flags & FLAG_GOT_EOF) { + this.destroySoon(); + } else { + this._flags |= FLAG_SHUTDOWN; + var shutdownReq = this._handle.shutdown(); + shutdownReq.oncomplete = afterShutdown; + } }; +function afterShutdown(status, handle, req) { + var self = handle.socket; + + assert.ok(self._flags & FLAG_SHUTDOWN); + + if (self._flags & FLAG_GOT_EOF) { + self.destroy(); + } else { + } +} + + Socket.prototype.destroySoon = function() { - throw new Error("implement me"); + this.writable = false; + this._flags |= FLAG_DESTROY_SOON; + + if (this._writeRequests.length == 0) { + this.destroy(); + } }; @@ -167,8 +201,12 @@ function onread(buffer, offset, length) { // EOF self.readable = false; + assert.ok(!(self._flags & FLAG_GOT_EOF)); + self._flags |= FLAG_GOT_EOF; + + // We call destroy() before end(). 'close' not emitted until nextTick so + // the 'end' event will come first as required. if (!self.writable) self.destroy(); - // Note: 'close' not emitted until nextTick. if (!self.allowHalfOpen) self.end(); if (self._events && self._events['end']) self.emit('end'); @@ -230,7 +268,15 @@ function afterWrite(status, handle, req, buffer) { var req_ = self._writeRequests.shift(); assert.equal(req, req_); + if (self._writeRequests.length == 0) { + self.emit('drain'); + } + if (req.cb) req.cb(); + + if (self._writeRequests.length == 0 && self._flags & FLAG_DESTROY_SOON) { + self.destroy(); + } } @@ -257,12 +303,13 @@ Socket.prototype.connect = function(port, host) { // TODO retrun promise from Socket.prototype.connect which // wraps _connectReq. - assert.ok(!self._connectReq); + assert.ok(!self._connecting); - self._connectReq = self._handle.connect(ip, port); + var connectReq = self._handle.connect(ip, port); - if (self._connectReq) { - self._connectReq.oncomplete = afterConnect; + if (connectReq) { + self._connecting = true; + connectReq.oncomplete = afterConnect; } else { self.destroy(errnoException(errno, 'connect')); } @@ -275,6 +322,9 @@ function afterConnect(status, handle, req) { var self = handle.socket; assert.equal(handle, self._handle); + assert.ok(self._connecting); + self._connecting = false; + if (status == 0) { self.readable = self.writable = true; timers.active(self); @@ -320,7 +370,7 @@ function Server(/* [ options, ] listener */) { this.on('connection', listenerCallback); this.connections = 0; - self.allowHalfOpen = options.allowHalfOpen || false; + this.allowHalfOpen = options.allowHalfOpen || false; this._handle = new TCP(); @@ -376,7 +426,10 @@ function onconnection(clientHandle) { var handle = this; var self = handle.socket; - var socket = new Socket({ handle: clientHandle }); + var socket = new Socket({ + handle: clientHandle, + allowHalfOpen: self.allowHalfOpen + }); socket.readable = socket.writable = true; socket.resume(); @@ -387,3 +440,7 @@ function onconnection(clientHandle) { self.emit('connection', socket); } + +Server.prototype.close = function() { + this._handle.close(); +}; -- 2.7.4