From 23204979926a3f69144be182786c5d2dcf58c0c2 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 20 Nov 2010 20:49:44 -0800 Subject: [PATCH] Revert "Merge branch 'writev'" This reverts commit cd9515efd99dfa6510e72342a2621bb4b291a89c, reversing changes made to df46c8e698b9400abaabd77ec836c7cdadf9735c. Too slow. Needs more work. --- lib/http.js | 4 +- lib/net.js | 322 ++++++++++++++++++----------- src/node.cc | 2 - src/node.js | 4 - src/node_buffer.cc | 8 +- src/node_buffer.h | 4 +- src/node_io_watcher.cc | 438 +--------------------------------------- src/node_io_watcher.h | 8 - src/node_net.cc | 2 +- test/fixtures/recvfd.js | 16 +- test/simple/test-dumper-unix.js | 135 ------------- test/simple/test-dumper.js | 128 ------------ test/simple/test-pipe.js | 13 +- test/simple/test-sendfd.js | 17 +- 14 files changed, 237 insertions(+), 864 deletions(-) delete mode 100644 test/simple/test-dumper-unix.js delete mode 100644 test/simple/test-dumper.js diff --git a/lib/http.js b/lib/http.js index 70585ca..b0e86f2 100644 --- a/lib/http.js +++ b/lib/http.js @@ -827,8 +827,8 @@ function connectionListener (socket) { // No more messages to be pushed out. // HACK: need way to do this with socket interface - if (socket._writeWatcher.firstBucket) { - socket.__destroyOnDrain = true; + if (socket._writeQueue.length) { + socket.__destroyOnDrain = true; //socket.end(); } else { socket.destroy(); } diff --git a/lib/net.js b/lib/net.js index f5abf50..6373b40 100644 --- a/lib/net.js +++ b/lib/net.js @@ -54,33 +54,6 @@ var ioWatchers = new FreeList("iowatcher", 100, function () { return new IOWatcher(); }); - -IOWatcher.prototype.ondrain = function () { - if (this.socket) { - var socket = this.socket; - - socket._haveTriedFlush = false; - - if (socket.writable || socket.readable) { - require('timers').active(socket); - } - - socket.emit('drain'); - if (socket.ondrain) socket.ondrain(); - - if (socket._eof) socket._shutdown(); - } -}; - - -IOWatcher.prototype.onerror = function (errno) { - assert(this.socket); - var e = errnoException(errno, 'write'); - e.message += " fd=" + this.socket.fd; - this.socket.destroy(e); -}; - - exports.isIP = binding.isIP; exports.isIPv4 = function (input) { @@ -119,6 +92,16 @@ function setImplmentationMethods (self) { }; if (self.type == 'unix') { + self._writeImpl = function (buf, off, len, fd, flags) { + // Detect and disallow zero-byte writes wth an attached file + // descriptor. This is an implementation limitation of sendmsg(2). + if (fd && noData(buf, off, len)) { + throw new Error('File descriptors can only be written with data'); + } + + return sendMsg(self.fd, buf, off, len, fd, flags); + }; + self._readImpl = function (buf, off, len) { var bytesRead = recvMsg(self.fd, buf, off, len); @@ -140,21 +123,36 @@ function setImplmentationMethods (self) { return bytesRead; }; } else { + self._writeImpl = function (buf, off, len, fd, flags) { + // XXX: TLS support requires that 0-byte writes get processed + // by the kernel for some reason. Otherwise, we'd just + // fast-path return here. + + // Drop 'fd' and 'flags' as these are not supported by the write(2) + // system call + return write(self.fd, buf, off, len); + }; + self._readImpl = function (buf, off, len) { return read(self.fd, buf, off, len); }; } + + self._shutdownImpl = function () { + shutdown(self.fd, 'write'); + }; + }; -function onReadable (readable, writable) { +function onReadable (readable, writeable) { assert(this.socket); var socket = this.socket; socket._onReadable(); } -function onWritable (readable, writable) { +function onWritable (readable, writeable) { assert(this.socket); var socket = this.socket; if (socket._connecting) { @@ -169,7 +167,11 @@ function initStream (self) { self._readWatcher.socket = self; self._readWatcher.callback = onReadable; self.readable = false; - self._eof = false; + + // Queue of buffers and string that need to be written to socket. + self._writeQueue = []; + self._writeQueueEncoding = []; + self._writeQueueFD = []; self._writeWatcher = ioWatchers.alloc(); self._writeWatcher.socket = self; @@ -211,11 +213,6 @@ Stream.prototype._onTimeout = function () { }; -Stream.prototype.writeQueueSize = function () { - return this._writeWatcher.queueSize || 0; -}; - - Stream.prototype.open = function (fd, type) { initStream(this); @@ -225,10 +222,6 @@ Stream.prototype.open = function (fd, type) { setImplmentationMethods(this); - if (this.type === "unix") { - this._writeWatcher.isUnixSocket = true; - } - this._writeWatcher.set(this.fd, false, true); this.writable = true; }; @@ -262,79 +255,182 @@ Object.defineProperty(Stream.prototype, 'readyState', { }); -Stream.prototype._appendBucket = function (data, encoding, fd, callback) { - if (data.length != 0) { - // TODO reject empty data. - var newBucket = { data: data }; - if (encoding) newBucket.encoding = encoding; - if (fd) newBucket.fd = fd; - if (callback) newBucket.callback = callback; +// Returns true if all the data was flushed to socket. Returns false if +// something was queued. If data was queued, then the "drain" event will +// signal when it has been finally flushed to socket. +Stream.prototype.write = function (data, encoding, fd) { + if (this._connecting || (this._writeQueue && this._writeQueue.length)) { + if (!this._writeQueue) { + this._writeQueue = []; + this._writeQueueEncoding = []; + this._writeQueueFD = []; + } - // TODO properly calculate queueSize + // Slow. There is already a write queue, so let's append to it. + if (this._writeQueueLast() === END_OF_FILE) { + throw new Error('Stream.end() called already; cannot write.'); + } - if (this._writeWatcher.lastBucket) { - this._writeWatcher.lastBucket.next = newBucket; + if (typeof data == 'string' && + this._writeQueue.length && + typeof this._writeQueue[this._writeQueue.length-1] === 'string' && + this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) { + // optimization - concat onto last + this._writeQueue[this._writeQueue.length-1] += data; } else { - this._writeWatcher.firstBucket = newBucket; + this._writeQueue.push(data); + this._writeQueueEncoding.push(encoding); + } + + if (fd != undefined) { + this._writeQueueFD.push(fd); } - this._writeWatcher.lastBucket = newBucket; + return false; + } else { + // Fast. + // The most common case. There is no write queue. Just push the data + // directly to the socket. + return this._writeOut(data, encoding, fd); } +}; - if (this._writeWatcher.queueSize === undefined) { - this._writeWatcher.queueSize = 0; +// Directly writes the data to socket. +// +// Steps: +// 1. If it's a string, write it to the `pool`. (If not space remains +// on the pool make a new one.) +// 2. Write data to socket. Return true if flushed. +// 3. Slice out remaining +// 4. Unshift remaining onto _writeQueue. Return false. +Stream.prototype._writeOut = function (data, encoding, fd) { + if (!this.writable) { + throw new Error('Stream is not writable'); } - assert(this._writeWatcher.queueSize >= 0); - this._writeWatcher.queueSize += data.length; - return this._writeWatcher.queueSize; -}; + var buffer, off, len; + var bytesWritten, charsWritten; + var queuedData = false; + + if (typeof data != 'string') { + // 'data' is a buffer, ignore 'encoding' + buffer = data; + off = 0; + len = data.length; + } else { + assert(typeof data == 'string'); + + if (!pool || pool.length - pool.used < kMinPoolSpace) { + pool = null; + allocNewPool(); + } + + if (!encoding || encoding == 'utf8' || encoding == 'utf-8') { + // default to utf8 + bytesWritten = pool.write(data, 'utf8', pool.used); + charsWritten = Buffer._charsWritten; + } else { + bytesWritten = pool.write(data, encoding, pool.used); + charsWritten = bytesWritten; + } -Stream.prototype.write = function (data /* encoding, fd, callback */) { - if (this._eof) { - throw new Error('Stream.end() called already; cannot write.'); + if (encoding && data.length > 0) { + assert(bytesWritten > 0); + } + + buffer = pool; + len = bytesWritten; + off = pool.used; + + pool.used += bytesWritten; + + debug('wrote ' + bytesWritten + ' bytes to pool'); + + if (charsWritten != data.length) { + //debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n"); + // Unshift whatever didn't fit onto the buffer + this._writeQueue.unshift(data.slice(charsWritten)); + this._writeQueueEncoding.unshift(encoding); + this._writeWatcher.start(); + queuedData = true; + } } - if (!this.writable) { - throw new Error('Stream is not writable'); + try { + bytesWritten = this._writeImpl(buffer, off, len, fd, 0); + } catch (e) { + this.destroy(e); + return false; } - // parse the arguments. ugly. + debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n"); - var encoding, fd, callback; + require('timers').active(this); - if (arguments[1] === undefined || typeof arguments[1] == 'string') { - encoding = arguments[1]; - if (typeof arguments[2] == 'number') { - fd = arguments[2]; - callback = arguments[3]; + if (bytesWritten == len) { + // awesome. sent to buffer. + if (buffer === pool) { + // If we're just writing from the pool then we can make a little + // optimization and save the space. + buffer.used -= len; + } + + if (queuedData) { + return false; } else { - callback = arguments[2]; + return true; } - } else if (typeof arguments[1] == 'number') { - fd = arguments[1]; - callback = arguments[2]; - } else if (typeof arguments[1] == 'function') { - callback = arguments[1]; - } else { - throw new Error("Bad type for second argument"); } + // Didn't write the entire thing to buffer. + // Need to wait for the socket to become available before trying again. + this._writeWatcher.start(); - var queueSize = this._appendBucket(data, encoding, fd, callback); + // Slice out the data left. + var leftOver = buffer.slice(off + bytesWritten, off + len); + leftOver.used = leftOver.length; // used the whole thing... - if (this._connecting) return false; + // util.error('data.used = ' + data.used); + //if (!this._writeQueue) initWriteStream(this); - this._onWritable(); // Insert writeWatcher into the dumpQueue - require('timers').active(this); + // data should be the next thing to write. + this._writeQueue.unshift(leftOver); + this._writeQueueEncoding.unshift(null); - if (queueSize > (64*1024) && !this._haveTriedFlush) { - IOWatcher.flush(); - this._haveTriedFlush = true; + // If didn't successfully write any bytes, enqueue our fd and try again + if (!bytesWritten) { + this._writeQueueFD.unshift(fd); } - return queueSize < (64*1024); + return false; +}; + + +// Flushes the write buffer out. +// Returns true if the entire buffer was flushed. +Stream.prototype.flush = function () { + while (this._writeQueue && this._writeQueue.length) { + var data = this._writeQueue.shift(); + var encoding = this._writeQueueEncoding.shift(); + var fd = this._writeQueueFD.shift(); + + if (data === END_OF_FILE) { + this._shutdown(); + return true; + } + + var flushed = this._writeOut(data,encoding,fd); + if (!flushed) return false; + } + if (this._writeWatcher) this._writeWatcher.stop(); + return true; +}; + + +Stream.prototype._writeQueueLast = function () { + return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1] + : null; }; @@ -377,12 +473,6 @@ Stream.prototype._onConnect = function () { this._connecting = false; this.resume(); this.readable = this.writable = true; - - if (this._writeWatcher.firstBucket) { - // Flush this in case any writes are queued up while connecting. - this._onWritable(); - } - try { this.emit('connect'); } catch (e) { @@ -390,6 +480,12 @@ Stream.prototype._onConnect = function () { return; } + + if (this._writeQueue && this._writeQueue.length) { + // Flush this in case any writes are queued up while connecting. + this._onWritable(); + } + } else if (errno != EINPROGRESS) { this.destroy(errnoException(errno, 'connect')); } @@ -397,10 +493,11 @@ Stream.prototype._onConnect = function () { Stream.prototype._onWritable = function () { - // Stick it into the dumpQueue - if (!this._writeWatcher.next) { - this._writeWatcher.next = IOWatcher.dumpQueue.next; - IOWatcher.dumpQueue.next = this._writeWatcher; + // Stream becomes writable on connect() but don't flush if there's + // nothing actually to write + if (this.flush()) { + if (this._events && this._events['drain']) this.emit("drain"); + if (this.ondrain) this.ondrain(); // Optimization } }; @@ -541,12 +638,7 @@ Stream.prototype.pause = function () { Stream.prototype.resume = function () { - if (this.fd === null) { - // TODO, FIXME: throwing here breaks test/simple/test-pipe.js - // throw new Error('Cannot resume() closed Stream.'); - return; - } - this._readWatcher.stop(); + if (this.fd === null) throw new Error('Cannot resume() closed Stream.'); this._readWatcher.set(this.fd, true, false); this._readWatcher.start(); }; @@ -556,14 +648,15 @@ Stream.prototype.destroy = function (exception) { // pool is shared between sockets, so don't need to free it here. var self = this; - this._eof = this.readable = this.writable = false; + // TODO would like to set _writeQueue to null to avoid extra object alloc, + // but lots of code assumes this._writeQueue is always an array. + this._writeQueue = []; + + this.readable = this.writable = false; if (this._writeWatcher) { this._writeWatcher.stop(); this._writeWatcher.socket = null; - this._writeWatcher.firstBucket = null; - this._writeWatcher.lastBucket = null; - this._writeWatcher.isUnixSocket = false; ioWatchers.free(this._writeWatcher); this._writeWatcher = null; } @@ -602,7 +695,7 @@ Stream.prototype._shutdown = function () { this.writable = false; try { - shutdown(this.fd, 'write'); + this._shutdownImpl(); } catch (e) { this.destroy(e); } @@ -615,14 +708,15 @@ Stream.prototype._shutdown = function () { Stream.prototype.end = function (data, encoding) { - if (!this.writable) return; // TODO this should throw error - if (this._eof) return; // TODO this should also throw error - - if (data) this._appendBucket(data, encoding); - this._eof = true; - - // If this isn't in the dumpQueue then we shutdown now. - if (!this._writeWatcher.firstBucket) this._shutdown(); + if (this.writable) { + if (this._writeQueueLast() !== END_OF_FILE) { + if (data) this.write(data, encoding); + this._writeQueue.push(END_OF_FILE); + if (!this._connecting) { + this.flush(); + } + } + } }; diff --git a/src/node.cc b/src/node.cc index cdcd66d..46c56e2 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1967,8 +1967,6 @@ int Start(int argc, char *argv[]) { Tick(); - IOWatcher::Dump(); - } while (need_tick_cb || ev_activecnt(EV_DEFAULT_UC) > 0); diff --git a/src/node.js b/src/node.js index ea445de..31e7da4 100644 --- a/src/node.js +++ b/src/node.js @@ -29,10 +29,6 @@ process.assert = function (x, msg) { var writeError = process.binding('stdio').writeError; -// Need to force-load this binding so that we can IOWatcher::Dump in -// src/node.cc -var IOWatcher = process.binding('io_watcher'); - // nextTick() var nextTickQueue = []; diff --git a/src/node_buffer.cc b/src/node_buffer.cc index 00a6676..a042c80 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -82,8 +82,7 @@ static size_t ByteLength (Handle string, enum encoding enc) { } -Local Buffer::New(Handle string, - Handle encoding) { +Handle Buffer::New(Handle string) { HandleScope scope; // get Buffer from global scope. @@ -92,9 +91,8 @@ Local Buffer::New(Handle string, assert(bv->IsFunction()); Local b = Local::Cast(bv); - Local argv[2] = { Local::New(string), - Local::New(encoding) }; - Local instance = b->NewInstance(2, argv); + Local argv[1] = { Local::New(string) }; + Local instance = b->NewInstance(1, argv); return scope.Close(instance); } diff --git a/src/node_buffer.h b/src/node_buffer.h index fa39b1e..79fa34d 100644 --- a/src/node_buffer.h +++ b/src/node_buffer.h @@ -25,9 +25,7 @@ class Buffer : public ObjectWrap { typedef void (*free_callback)(char *data, void *hint); // C++ API for constructing fast buffer - static v8::Local New( - v8::Handle string, - v8::Handle encoding = v8::Handle()); + static v8::Handle New(v8::Handle string); static void Initialize(v8::Handle target); static Buffer* New(size_t length); // public constructor diff --git a/src/node_io_watcher.cc b/src/node_io_watcher.cc index a947757..7e51615 100644 --- a/src/node_io_watcher.cc +++ b/src/node_io_watcher.cc @@ -2,44 +2,17 @@ #include #include -#include #include - -#include /* writev */ -#include -#include /* IOV_MAX */ - -#include -#include - - #include namespace node { using namespace v8; -static ev_prepare dumper; -static Persistent dump_queue; - Persistent IOWatcher::constructor_template; Persistent callback_symbol; -static Persistent next_sym; -static Persistent prev_sym; -static Persistent ondrain_sym; -static Persistent onerror_sym; -static Persistent data_sym; -static Persistent encoding_sym; -static Persistent offset_sym; -static Persistent fd_sym; -static Persistent is_unix_socket_sym; -static Persistent first_bucket_sym; -static Persistent last_bucket_sym; -static Persistent queue_size_sym; -static Persistent callback_sym; - void IOWatcher::Initialize(Handle target) { HandleScope scope; @@ -53,39 +26,9 @@ void IOWatcher::Initialize(Handle target) { NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop); NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set); - Local io_watcher = constructor_template->GetFunction(); - target->Set(String::NewSymbol("IOWatcher"), io_watcher); - - NODE_SET_METHOD(constructor_template->GetFunction(), - "flush", - IOWatcher::Flush); + target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction()); callback_symbol = NODE_PSYMBOL("callback"); - - next_sym = NODE_PSYMBOL("next"); - prev_sym = NODE_PSYMBOL("prev"); - ondrain_sym = NODE_PSYMBOL("ondrain"); - onerror_sym = NODE_PSYMBOL("onerror"); - first_bucket_sym = NODE_PSYMBOL("firstBucket"); - last_bucket_sym = NODE_PSYMBOL("lastBucket"); - queue_size_sym = NODE_PSYMBOL("queueSize"); - offset_sym = NODE_PSYMBOL("offset"); - fd_sym = NODE_PSYMBOL("fd"); - is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket"); - data_sym = NODE_PSYMBOL("data"); - encoding_sym = NODE_PSYMBOL("encoding"); - callback_sym = NODE_PSYMBOL("callback"); - - - ev_prepare_init(&dumper, IOWatcher::Dump); - ev_prepare_start(EV_DEFAULT_UC_ &dumper); - // Need to make sure that Dump runs *after* all other prepare watchers - - // in particular the next tick one. - ev_set_priority(&dumper, EV_MINPRI); - ev_unref(EV_DEFAULT_UC); - - dump_queue = Persistent::New(Object::New()); - io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue); } @@ -201,384 +144,5 @@ Handle IOWatcher::Set(const Arguments& args) { } -Handle IOWatcher::Flush(const Arguments& args) { - HandleScope scope; // unneccessary? - IOWatcher::Dump(); - return Undefined(); -} - -#define KB 1024 - -/* - * A large javascript object structure is built up in net.js. The function - * Dump is called at the end of each iteration, before select() is called, - * to push all the data out to sockets. - * - * The structure looks like this: - * - * IOWatcher . dumpQueue - * | - * watcher . buckets - b - b - b - b - * | - * watcher . buckets - b - b - * | - * watcher . buckets - b - * | - * watcher . buckets - b - b - b - * - * The 'b' nodes are little javascript objects buckets. Each has a 'data' - * member. 'data' is either a string or buffer. E.G. - * - * b = { data: "hello world" } - * - */ - -// To enable this debug output, add '-DDUMP_DEBUG' to CPPFLAGS -// in 'build/c4che/default.cache.py' and 'make clean all' -#ifdef DUMP_DEBUG -#define DEBUG_PRINT(fmt,...) \ - fprintf(stderr, "(dump:%d) " fmt "\n", __LINE__, ##__VA_ARGS__) -#else -#define DEBUG_PRINT(fmt,...) -#endif - - -void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) { - assert(revents == EV_PREPARE); - assert(w == &dumper); - Dump(); -} - - -void IOWatcher::Dump() { - HandleScope scope; - - static struct iovec iov[IOV_MAX]; - - // Loop over all watchers in the dump queue. Each one stands for a socket - // that has stuff to be written out. - // - // There are several possible outcomes for each watcher. - // 1. All the buckets associated with the watcher are written out. In this - // case the watcher is disabled; it is removed from the dump_queue. - // 2. Some of the data was written, but there still remains buckets. In - // this case the watcher is enabled (i.e. we wait for the file - // descriptor to become readable) and we remove it from the dump_queue. - // When it becomes readable, we'll get a callback in net.js and add it - // again to the dump_queue - // 3. writev returns EAGAIN. This is the same as case 2. - // - // In any case, the dump queue should be empty when we exit this function. - // (See the assert at the end of the outermost for loop. - Local watcher_v; - Local watcher; - - for (watcher_v = dump_queue->Get(next_sym); - watcher_v->IsObject(); - dump_queue->Set(next_sym, (watcher_v = watcher->Get(next_sym))), - watcher->Set(next_sym, Null())) { - watcher = watcher_v->ToObject(); - - IOWatcher *io = ObjectWrap::Unwrap(watcher); - - // stats (just for fun) - io->dumps_++; - io->last_dump_ = ev_now(EV_DEFAULT_UC); - - DEBUG_PRINT("<%d> dumping", io->watcher_.fd); - - // Number of items we've stored in iov - int iovcnt = 0; - // Number of bytes we've stored in iov - size_t to_write = 0; - - bool unix_socket = false; - if (watcher->Has(is_unix_socket_sym) && watcher->Get(is_unix_socket_sym)->IsTrue()) { - unix_socket = true; - } - - // Unix sockets don't like huge messages. TCP sockets do. - // TODO: handle EMSGSIZE after sendmsg(). - size_t max_to_write = unix_socket ? 8*KB : 256*KB; - - int fd_to_send = -1; - - // Offset is only as large as the first buffer of data. (See assert - // below) Offset > 0 occurs when a previous writev could not entirely - // drain a bucket. - size_t offset = 0; - if (watcher->Has(offset_sym)) { - offset = watcher->Get(offset_sym)->Uint32Value(); - } - size_t first_offset = offset; - - DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset); - - // Loop over all the buckets for this particular watcher/socket in order - // to fill iov. - Local bucket_v; - Local bucket; - unsigned int bucket_index = 0; - - for (bucket_v = watcher->Get(first_bucket_sym); - // Break if we have an FD to send. - // sendmsg can only handle one FD at a time. - fd_to_send < 0 && - // break if we've hit the end - bucket_v->IsObject() && - // break if iov contains a lot of data - to_write < max_to_write && - // break if iov is running out of space - iovcnt < IOV_MAX; - bucket_v = bucket->Get(next_sym), bucket_index++) { - assert(bucket_v->IsObject()); - bucket = bucket_v->ToObject(); - - Local data_v = bucket->Get(data_sym); - // net.js will be setting this 'data' value. We can ensure that it is - // never empty. - assert(!data_v.IsEmpty()); - - Local buf_object; - - if (data_v->IsString()) { - // TODO: insert v8::String::Pointers() hack here. - Local s = data_v->ToString(); - Local e = bucket->Get(encoding_sym); - buf_object = Buffer::New(s, e); - bucket->Set(data_sym, buf_object); - } else { - assert(Buffer::HasInstance(data_v)); - buf_object = data_v->ToObject(); - } - - size_t l = Buffer::Length(buf_object); - - if (l == 0) continue; - - assert(first_offset < l); - iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset; - iov[iovcnt].iov_len = l - first_offset; - to_write += iov[iovcnt].iov_len; - iovcnt++; - - first_offset = 0; // only the first buffer will be offset. - - if (unix_socket && bucket->Has(fd_sym)) { - Local fd_v = bucket->Get(fd_sym); - if (fd_v->IsInt32()) { - fd_to_send = fd_v->Int32Value(); - DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send); - assert(fd_to_send >= 0); - } - } - } - - if (to_write > 0) { - ssize_t written; - - if (unix_socket) { - struct msghdr msg; - char scratch[64]; - - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = iov; - msg.msg_iovlen = iovcnt; - msg.msg_control = NULL; // void* - msg.msg_controllen = 0; // socklen_t - msg.msg_flags = 0; // int - - if (fd_to_send >= 0) { - struct cmsghdr *cmsg; - - msg.msg_control = (void *) scratch; - msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); - - cmsg = CMSG_FIRSTHDR(&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = msg.msg_controllen; - *(int*) CMSG_DATA(cmsg) = fd_to_send; - } - - written = sendmsg(io->watcher_.fd, &msg, 0); - } else { - written = writev(io->watcher_.fd, iov, iovcnt); - } - - DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld", - io->watcher_.fd, - iovcnt, - to_write, - written); - - if (written < 0) { - // Allow EAGAIN. - // TODO: handle EMSGSIZE after sendmsg(). - if (errno == EAGAIN) { - DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd); - io->Start(); - } else { - // Emit error event - if (watcher->Has(onerror_sym)) { - Local callback_v = io->handle_->Get(onerror_sym); - assert(callback_v->IsFunction()); - Local callback = Local::Cast(callback_v); - - Local argv[1] = { Integer::New(errno) }; - - TryCatch try_catch; - - callback->Call(io->handle_, 1, argv); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } - } - } - // Continue with the next watcher. - continue; - } - - // what about written == 0 ? - - size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value(); - DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size); - assert(queue_size >= offset); - - // Now drop the buckets that have been written. - bucket_index = 0; - - while (written > 0) { - bucket_v = watcher->Get(first_bucket_sym); - if (!bucket_v->IsObject()) { - // No more buckets in the queue. Make sure the last_bucket_sym is - // updated and then go to the next watcher. - watcher->Set(last_bucket_sym, Null()); - break; - } - - bucket = bucket_v->ToObject(); - - Local data_v = bucket->Get(data_sym); - assert(!data_v.IsEmpty()); - - // At the moment we're turning all string into buffers - // so we assert that this is not a string. However, when the - // "Pointer patch" lands, this assert will need to be removed. - assert(!data_v->IsString()); - // When the "Pointer patch" lands, we will need to be careful - // to somehow store the length of strings that we're optimizing on - // so that it need not be recalculated here. Note the "Pointer patch" - // will only apply to ASCII strings - UTF8 ones will need to be - // serialized onto a buffer. - size_t bucket_len = Buffer::Length(data_v->ToObject()); - - if (unix_socket && bucket->Has(fd_sym)) { - bucket->Set(fd_sym, Null()); - } - - DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld", - io->watcher_.fd, - bucket_index, - bucket_len, - offset); - assert(bucket_len > offset); - - // Only on the first bucket does is the offset > 0. - if (offset + written < bucket_len) { - // we have not written the entire bucket - DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. " - "setting watcher.offset = %ld", - io->watcher_.fd, - bucket_index, - offset + written); - - watcher->Set(offset_sym, - Integer::NewFromUnsigned(offset + written)); - break; - } else { - DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.", - io->watcher_.fd, - bucket_index); - - assert(bucket_len <= queue_size); - queue_size -= bucket_len; - - assert(bucket_len - offset <= written); - written -= bucket_len - offset; - - Local bucket_callback_v = bucket->Get(callback_sym); - if (bucket_callback_v->IsFunction()) { - Local bucket_callback = - Local::Cast(bucket_callback_v); - TryCatch try_catch; - bucket_callback->Call(io->handle_, 0, NULL); - if (try_catch.HasCaught()) { - FatalException(try_catch); - } - } - - // Offset is now zero - watcher->Set(offset_sym, Integer::NewFromUnsigned(0)); - } - - offset = 0; // the next bucket will have zero offset; - bucket_index++; - - // unshift - watcher->Set(first_bucket_sym, bucket->Get(next_sym)); - } - - watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size)); - } - - - // Finished dumping the buckets. - // - // If our list of buckets is empty, we can emit 'drain' and forget about - // this socket. Nothing needs to be done. - // - // Otherwise we need to prepare the io_watcher to wait for the interface - // to become writable again. - - if (watcher->Get(first_bucket_sym)->IsObject()) { - // Still have buckets to be written. Wait for fd to become writable. - io->Start(); - - DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd); - } else { - // No more buckets in the queue. Make sure the last_bucket_sym is - // updated and then go to the next watcher. - watcher->Set(last_bucket_sym, Null()); - - // Emptied the buckets queue for this socket. Don't wait for it to - // become writable. - io->Stop(); - - DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd); - - // Emit drain event - if (watcher->Has(ondrain_sym)) { - Local callback_v = io->handle_->Get(ondrain_sym); - assert(callback_v->IsFunction()); - Local callback = Local::Cast(callback_v); - - TryCatch try_catch; - - callback->Call(io->handle_, 0, NULL); - - if (try_catch.HasCaught()) { - FatalException(try_catch); - } - } - } - } - - // Assert that the dump_queue is empty. - assert(!dump_queue->Get(next_sym)->IsObject()); -} - } // namespace node diff --git a/src/node_io_watcher.h b/src/node_io_watcher.h index 71d7142..06d431e 100644 --- a/src/node_io_watcher.h +++ b/src/node_io_watcher.h @@ -10,7 +10,6 @@ namespace node { class IOWatcher : ObjectWrap { public: static void Initialize(v8::Handle target); - static void Dump(); protected: static v8::Persistent constructor_template; @@ -27,7 +26,6 @@ class IOWatcher : ObjectWrap { } static v8::Handle New(const v8::Arguments& args); - static v8::Handle Flush(const v8::Arguments& args); static v8::Handle Start(const v8::Arguments& args); static v8::Handle Stop(const v8::Arguments& args); static v8::Handle Set(const v8::Arguments& args); @@ -35,15 +33,9 @@ class IOWatcher : ObjectWrap { private: static void Callback(EV_P_ ev_io *watcher, int revents); - static void Dump(EV_P_ ev_prepare *watcher, int revents); - void Start(); void Stop(); - // stats. TODO: expose to js, add reset() method - uint64_t dumps_; - ev_tstamp last_dump_; - ev_io watcher_; }; diff --git a/src/node_net.cc b/src/node_net.cc index f068fc8..a74e2dc 100644 --- a/src/node_net.cc +++ b/src/node_net.cc @@ -3,7 +3,6 @@ #include #include -#include #include #include @@ -38,6 +37,7 @@ #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a))) + namespace node { using namespace v8; diff --git a/test/fixtures/recvfd.js b/test/fixtures/recvfd.js index 8f06469..09b2864 100644 --- a/test/fixtures/recvfd.js +++ b/test/fixtures/recvfd.js @@ -22,33 +22,35 @@ function processData(s) { // version of our modified object back. Clean up when we're done. var pipeStream = new net.Stream(fd); - pipeStream.resume(); - - pipeStream.write(JSON.stringify(d) + '\n', function () { + var drainFunc = function() { pipeStream.destroy(); if (++numSentMessages == 2) { s.destroy(); } - }); + }; + + pipeStream.addListener('drain', drainFunc); + pipeStream.resume(); + + if (pipeStream.write(JSON.stringify(d) + '\n')) { + drainFunc(); + } }; // Create a UNIX socket to the path defined by argv[2] and read a file // descriptor and misc data from it. var s = new net.Stream(); - s.addListener('fd', function(fd) { receivedFDs.unshift(fd); processData(s); }); - s.addListener('data', function(data) { data.toString('utf8').trim().split('\n').forEach(function(d) { receivedData.unshift(JSON.parse(d)); }); processData(s); }); - s.connect(process.argv[2]); // vim:ts=2 sw=2 et diff --git a/test/simple/test-dumper-unix.js b/test/simple/test-dumper-unix.js deleted file mode 100644 index 2e51036..0000000 --- a/test/simple/test-dumper-unix.js +++ /dev/null @@ -1,135 +0,0 @@ -var assert =require('assert'); -var IOWatcher = process.binding('io_watcher').IOWatcher; -var errnoException = process.binding('net').errnoException; -var close = process.binding('net').close; -var net = require('net'); - -var ncomplete = 0; - -function test (N, b, cb) { - var fdsSent = 0; - var fdsRecv = 0; - //console.trace(); - var expected = N * b.length; - var nread = 0; - - // Create a socketpair - var fds = process.binding('net').socketpair(); - - // Use writev/dumper to send data down the one of the sockets, fds[1]. - // This requires a IOWatcher. - var w = new IOWatcher(); - w.set(fds[1], false, true); - w.isUnixSocket = true; - - w.callback = function (readable, writable) { - assert.ok(!readable && writable); // not really important. - // Insert watcher into dumpQueue - w.next = IOWatcher.dumpQueue.next; - IOWatcher.dumpQueue.next = w; - } - - var ndrain = 0; - w.ondrain = function () { - ndrain++; - } - - var nerror = 0; - w.onerror = function (errno) { - throw errnoException(errno); - nerror++; - } - - // The read end, fds[0], will be used to count how much comes through. - // This sets up a readable stream on fds[0]. - var stream = new net.Stream({ fd: fds[0], type: 'unix' }); - //stream.readable = true; - stream.resume(); - - stream.on('fd', function (fd) { - console.log('got fd %d', fd); - fdsRecv++; - }); - - // Count the data as it arrives on the other end - stream.on('data', function (d) { - nread += d.length; - - if (nread >= expected) { - assert.ok(nread === expected); - assert.equal(1, ndrain); - assert.equal(0, nerror); - console.error("done. wrote %d bytes\n", nread); - close(fds[1]); - } - }); - - - stream.on('close', function () { - assert.equal(fdsSent, fdsRecv); - // check to make sure the watcher isn't in the dump queue. - for (var x = IOWatcher.dumpQueue; x; x = x.next) { - assert.ok(x !== w); - } - assert.equal(null, w.next); - // completely flushed - assert.ok(!w.firstBucket); - assert.ok(!w.lastBucket); - - ncomplete++; - if (cb) cb(); - }); - - - // Insert watcher into dumpQueue - w.next = IOWatcher.dumpQueue.next; - IOWatcher.dumpQueue.next = w; - - w.firstBucket = { data: b }; - w.lastBucket = w.firstBucket; - w.queueSize = b.length; - - for (var i = 0; i < N-1; i++) { - var bucket = { data: b }; - w.lastBucket.next = bucket; - w.lastBucket = bucket; - w.queueSize += b.length; - // Kind of randomly fill these buckets with fds. - if (fdsSent < 5 && i % 2 == 0) { - bucket.fd = 1; // send stdout - fdsSent++; - } - } -} - - -function runTests (values) { - expectedToComplete = values.length; - - function go () { - if (ncomplete < values.length) { - var v = values[ncomplete]; - console.log("test N=%d, size=%d", v[0], v[1].length); - test(v[0], v[1], go); - } - } - - go(); -} - -runTests([ [30, Buffer(1000)] - , [4, Buffer(10000)] - , [1, "hello world\n"] - , [50, Buffer(1024*1024)] - , [500, Buffer(40960+1)] - , [500, Buffer(40960-1)] - , [500, Buffer(40960)] - , [500, Buffer(1024*1024+1)] - , [50000, "hello world\n"] - ]); - - -process.on('exit', function () { - assert.equal(expectedToComplete, ncomplete); -}); - diff --git a/test/simple/test-dumper.js b/test/simple/test-dumper.js deleted file mode 100644 index c159774..0000000 --- a/test/simple/test-dumper.js +++ /dev/null @@ -1,128 +0,0 @@ -var assert =require('assert'); -var IOWatcher = process.binding('io_watcher').IOWatcher; -var errnoException = process.binding('net').errnoException; -var close = process.binding('net').close; -var net = require('net'); - -var ncomplete = 0; - - - - - -function test (N, b, cb) { - //console.trace(); - var expected = N * b.length; - var nread = 0; - - // Create a pipe - var fds = process.binding('net').pipe(); - console.log("fds == %j", fds); - - // Use writev/dumper to send data down the write end of the pipe, fds[1]. - // This requires a IOWatcher. - var w = new IOWatcher(); - w.set(fds[1], false, true); - - w.callback = function (readable, writable) { - assert.ok(!readable && writable); // not really important. - // Insert watcher into dumpQueue - w.next = IOWatcher.dumpQueue.next; - IOWatcher.dumpQueue.next = w; - } - - var ndrain = 0; - w.ondrain = function () { - ndrain++; - } - - var nerror = 0; - w.onerror = function (errno) { - throw errnoException(errno); - nerror++; - } - - // The read end, fds[0], will be used to count how much comes through. - // This sets up a readable stream on fds[0]. - var stream = new net.Stream(); - stream.open(fds[0]); - stream.readable = true; - stream.resume(); - - // Count the data as it arrives on the read end of the pipe. - stream.on('data', function (d) { - nread += d.length; - - if (nread >= expected) { - assert.ok(nread === expected); - assert.equal(1, ndrain); - assert.equal(0, nerror); - console.error("done. wrote %d bytes\n", nread); - close(fds[1]); - } - }); - - stream.on('close', function () { - // check to make sure the watcher isn't in the dump queue. - for (var x = IOWatcher.dumpQueue; x; x = x.next) { - assert.ok(x !== w); - } - assert.equal(null, w.next); - // completely flushed - assert.ok(!w.firstBucket); - assert.ok(!w.lastBucket); - - ncomplete++; - if (cb) cb(); - }); - - - // Insert watcher into dumpQueue - w.next = IOWatcher.dumpQueue.next; - IOWatcher.dumpQueue.next = w; - - w.firstBucket = { data: b }; - w.lastBucket = w.firstBucket; - w.queueSize = b.length; - - for (var i = 0; i < N-1; i++) { - var bucket = { data: b }; - assert.ok(!w.lastBucket.next); - w.lastBucket.next = bucket; - w.lastBucket = bucket; - w.queueSize += b.length; - } -} - - -function runTests (values) { - expectedToComplete = values.length; - - function go () { - if (ncomplete < values.length) { - var v = values[ncomplete]; - console.log("test N=%d, size=%d", v[0], v[1].length); - test(v[0], v[1], go); - } - } - - go(); -} - -runTests([ [3, Buffer(1000)], - [30, Buffer(1000)], - [4, Buffer(10000)], - [1, "hello world\n"], - [50, Buffer(1024*1024)], - [500, Buffer(40960+1)], - [500, Buffer(40960-1)], - [500, Buffer(40960)], - [500, Buffer(1024*1024+1)], - [50000, "hello world\n"] - ]); - - -process.on('exit', function () { - assert.equal(expectedToComplete, ncomplete); -}); - diff --git a/test/simple/test-pipe.js b/test/simple/test-pipe.js index d12c2b1..75db48e 100644 --- a/test/simple/test-pipe.js +++ b/test/simple/test-pipe.js @@ -17,22 +17,20 @@ var bufferSize = 5 * 1024 * 1024; */ var buffer = Buffer(bufferSize); for (var i = 0; i < buffer.length; i++) { - buffer[i] = 100; //parseInt(Math.random()*10000) % 256; + buffer[i] = parseInt(Math.random()*10000) % 256; } var web = http.Server(function (req, res) { web.close(); - console.log("web server connection fd=%d", req.connection.fd); - console.log(req.headers); var socket = net.Stream(); socket.connect(tcpPort); socket.on('connect', function () { - console.log('http->tcp connected fd=%d', socket.fd); + console.log('socket connected'); }); req.pipe(socket); @@ -56,7 +54,7 @@ web.listen(webPort, startClient); var tcp = net.Server(function (s) { tcp.close(); - console.log("tcp server connection fd=%d", s.fd); + console.log("tcp server connection"); var i = 0; @@ -93,11 +91,6 @@ function startClient () { req.write(buffer); req.end(); - console.log("request fd=%d", req.connection.fd); - - // note the queue includes http headers. - assert.ok(req.connection.writeQueueSize() > buffer.length); - req.on('response', function (res) { console.log('Got response'); res.setEncoding('utf8'); diff --git a/test/simple/test-sendfd.js b/test/simple/test-sendfd.js index 8052a13..7ed7b02 100644 --- a/test/simple/test-sendfd.js +++ b/test/simple/test-sendfd.js @@ -53,7 +53,7 @@ var logChild = function(d) { d.split('\n').forEach(function(l) { if (l.length > 0) { - console.error('CHILD: ' + l); + common.debug('CHILD: ' + l); } }); }; @@ -96,18 +96,19 @@ var srv = net.createServer(function(s) { buf.write(JSON.stringify(DATA) + '\n', 'utf8'); s.write(str, 'utf8', pipeFDs[1]); - - s.write(buf, pipeFDs[1], function () { - console.error("close pipeFDs[1]"); + if (s.write(buf, undefined, pipeFDs[1])) { netBinding.close(pipeFDs[1]); - }); + } else { + s.addListener('drain', function() { + netBinding.close(pipeFDs[1]); + }); + } }); srv.listen(SOCK_PATH); // Spawn a child running test/fixtures/recvfd.js -var cp = child_process.spawn(process.execPath, - [path.join(common.fixturesDir, 'recvfd.js'), - SOCK_PATH]); +var cp = child_process.spawn(process.argv[0], + [path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]); cp.stdout.addListener('data', logChild); cp.stderr.addListener('data', logChild); -- 2.7.4