Revert "Merge branch 'writev'"
authorRyan Dahl <ry@tinyclouds.org>
Sun, 21 Nov 2010 04:49:44 +0000 (20:49 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Sun, 21 Nov 2010 04:55:15 +0000 (20:55 -0800)
This reverts commit cd9515efd99dfa6510e72342a2621bb4b291a89c, reversing
changes made to df46c8e698b9400abaabd77ec836c7cdadf9735c.

Too slow. Needs more work.

14 files changed:
lib/http.js
lib/net.js
src/node.cc
src/node.js
src/node_buffer.cc
src/node_buffer.h
src/node_io_watcher.cc
src/node_io_watcher.h
src/node_net.cc
test/fixtures/recvfd.js
test/simple/test-dumper-unix.js [deleted file]
test/simple/test-dumper.js [deleted file]
test/simple/test-pipe.js
test/simple/test-sendfd.js

index 70585ca..b0e86f2 100644 (file)
@@ -827,8 +827,8 @@ function connectionListener (socket) {
       // No more messages to be pushed out.
 
       // HACK: need way to do this with socket interface
-      if (socket._writeWatcher.firstBucket) {
-        socket.__destroyOnDrain = true;
+      if (socket._writeQueue.length) {
+        socket.__destroyOnDrain = true; //socket.end();
       } else {
         socket.destroy();
       }
index f5abf50..6373b40 100644 (file)
@@ -54,33 +54,6 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
   return new IOWatcher();
 });
 
-
-IOWatcher.prototype.ondrain = function () {
-  if (this.socket) {
-    var socket = this.socket;
-
-    socket._haveTriedFlush = false;
-
-    if (socket.writable || socket.readable) {
-      require('timers').active(socket);
-    }
-
-    socket.emit('drain');
-    if (socket.ondrain) socket.ondrain();
-
-    if (socket._eof) socket._shutdown();
-  }
-};
-
-
-IOWatcher.prototype.onerror = function (errno) {
-  assert(this.socket);
-  var e = errnoException(errno, 'write');
-  e.message += " fd=" + this.socket.fd;
-  this.socket.destroy(e);
-};
-
-
 exports.isIP = binding.isIP;
 
 exports.isIPv4 = function (input) {
@@ -119,6 +92,16 @@ function setImplmentationMethods (self) {
   };
 
   if (self.type == 'unix') {
+    self._writeImpl = function (buf, off, len, fd, flags) {
+      // Detect and disallow zero-byte writes wth an attached file
+      // descriptor. This is an implementation limitation of sendmsg(2).
+      if (fd && noData(buf, off, len)) {
+        throw new Error('File descriptors can only be written with data');
+      }
+
+      return sendMsg(self.fd, buf, off, len, fd, flags);
+    };
+
     self._readImpl = function (buf, off, len) {
       var bytesRead = recvMsg(self.fd, buf, off, len);
 
@@ -140,21 +123,36 @@ function setImplmentationMethods (self) {
       return bytesRead;
     };
   } else {
+    self._writeImpl = function (buf, off, len, fd, flags) {
+      // XXX: TLS support requires that 0-byte writes get processed
+      //      by the kernel for some reason. Otherwise, we'd just
+      //      fast-path return here.
+
+      // Drop 'fd' and 'flags' as these are not supported by the write(2)
+      // system call
+      return write(self.fd, buf, off, len);
+    };
+
     self._readImpl = function (buf, off, len) {
       return read(self.fd, buf, off, len);
     };
   }
+
+  self._shutdownImpl = function () {
+    shutdown(self.fd, 'write');
+  };
+
 };
 
 
-function onReadable (readable, writable) {
+function onReadable (readable, writeable) {
   assert(this.socket);
   var socket = this.socket;
   socket._onReadable();
 }
 
 
-function onWritable (readable, writable) {
+function onWritable (readable, writeable) {
   assert(this.socket);
   var socket = this.socket;
   if (socket._connecting) {
@@ -169,7 +167,11 @@ function initStream (self) {
   self._readWatcher.socket = self;
   self._readWatcher.callback = onReadable;
   self.readable = false;
-  self._eof = false;
+
+  // Queue of buffers and string that need to be written to socket.
+  self._writeQueue = [];
+  self._writeQueueEncoding = [];
+  self._writeQueueFD = [];
 
   self._writeWatcher = ioWatchers.alloc();
   self._writeWatcher.socket = self;
@@ -211,11 +213,6 @@ Stream.prototype._onTimeout = function () {
 };
 
 
-Stream.prototype.writeQueueSize = function () {
-  return this._writeWatcher.queueSize || 0;
-};
-
-
 Stream.prototype.open = function (fd, type) {
   initStream(this);
 
@@ -225,10 +222,6 @@ Stream.prototype.open = function (fd, type) {
 
   setImplmentationMethods(this);
 
-  if (this.type === "unix") {
-    this._writeWatcher.isUnixSocket = true;
-  }
-
   this._writeWatcher.set(this.fd, false, true);
   this.writable = true;
 };
@@ -262,79 +255,182 @@ Object.defineProperty(Stream.prototype, 'readyState', {
 });
 
 
-Stream.prototype._appendBucket = function (data, encoding, fd, callback) {
-  if (data.length != 0) {
-    // TODO reject empty data.
-    var newBucket =  { data: data };
-    if (encoding) newBucket.encoding = encoding;
-    if (fd) newBucket.fd = fd;
-    if (callback) newBucket.callback = callback;
+// Returns true if all the data was flushed to socket. Returns false if
+// something was queued. If data was queued, then the "drain" event will
+// signal when it has been finally flushed to socket.
+Stream.prototype.write = function (data, encoding, fd) {
+  if (this._connecting || (this._writeQueue && this._writeQueue.length)) {
+    if (!this._writeQueue) {
+      this._writeQueue = [];
+      this._writeQueueEncoding = [];
+      this._writeQueueFD = [];
+    }
 
-    // TODO properly calculate queueSize
+    // Slow. There is already a write queue, so let's append to it.
+    if (this._writeQueueLast() === END_OF_FILE) {
+      throw new Error('Stream.end() called already; cannot write.');
+    }
 
-    if (this._writeWatcher.lastBucket) {
-      this._writeWatcher.lastBucket.next = newBucket;
+    if (typeof data == 'string' &&
+        this._writeQueue.length &&
+        typeof this._writeQueue[this._writeQueue.length-1] === 'string' &&
+        this._writeQueueEncoding[this._writeQueueEncoding.length-1] === encoding) {
+      // optimization - concat onto last
+      this._writeQueue[this._writeQueue.length-1] += data;
     } else {
-      this._writeWatcher.firstBucket = newBucket;
+      this._writeQueue.push(data);
+      this._writeQueueEncoding.push(encoding);
+    }
+
+    if (fd != undefined) {
+      this._writeQueueFD.push(fd);
     }
 
-    this._writeWatcher.lastBucket = newBucket;
+    return false;
+  } else {
+    // Fast.
+    // The most common case. There is no write queue. Just push the data
+    // directly to the socket.
+    return this._writeOut(data, encoding, fd);
   }
+};
 
-  if (this._writeWatcher.queueSize === undefined) {
-    this._writeWatcher.queueSize = 0;
+// Directly writes the data to socket.
+//
+// Steps:
+//   1. If it's a string, write it to the `pool`. (If not space remains
+//      on the pool make a new one.)
+//   2. Write data to socket. Return true if flushed.
+//   3. Slice out remaining
+//   4. Unshift remaining onto _writeQueue. Return false.
+Stream.prototype._writeOut = function (data, encoding, fd) {
+  if (!this.writable) {
+    throw new Error('Stream is not writable');
   }
-  assert(this._writeWatcher.queueSize >= 0);
-  this._writeWatcher.queueSize += data.length;
 
-  return this._writeWatcher.queueSize;
-};
+  var buffer, off, len;
+  var bytesWritten, charsWritten;
+  var queuedData = false;
+
+  if (typeof data != 'string') {
+    // 'data' is a buffer, ignore 'encoding'
+    buffer = data;
+    off = 0;
+    len = data.length;
 
+  } else {
+    assert(typeof data == 'string');
+
+    if (!pool || pool.length - pool.used < kMinPoolSpace) {
+      pool = null;
+      allocNewPool();
+    }
+
+    if (!encoding || encoding == 'utf8' || encoding == 'utf-8') {
+      // default to utf8
+      bytesWritten = pool.write(data, 'utf8', pool.used);
+      charsWritten = Buffer._charsWritten;
+    } else {
+      bytesWritten = pool.write(data, encoding, pool.used);
+      charsWritten = bytesWritten;
+    }
 
-Stream.prototype.write = function (data /* encoding, fd, callback */) {
-  if (this._eof) {
-    throw new Error('Stream.end() called already; cannot write.');
+    if (encoding && data.length > 0) {
+      assert(bytesWritten > 0);
+    }
+
+    buffer = pool;
+    len = bytesWritten;
+    off = pool.used;
+
+    pool.used += bytesWritten;
+
+    debug('wrote ' + bytesWritten + ' bytes to pool');
+
+    if (charsWritten != data.length) {
+      //debug("couldn't fit " + (data.length - charsWritten) + " bytes into the pool\n");
+      // Unshift whatever didn't fit onto the buffer
+      this._writeQueue.unshift(data.slice(charsWritten));
+      this._writeQueueEncoding.unshift(encoding);
+      this._writeWatcher.start();
+      queuedData = true;
+    }
   }
 
-  if (!this.writable) {
-    throw new Error('Stream is not writable');
+  try {
+    bytesWritten = this._writeImpl(buffer, off, len, fd, 0);
+  } catch (e) {
+    this.destroy(e);
+    return false;
   }
 
-  // parse the arguments. ugly.
+  debug('wrote ' + bytesWritten + ' to socket. [fd, off, len] = ' + JSON.stringify([this.fd, off, len]) + "\n");
 
-  var encoding, fd, callback;
+  require('timers').active(this);
 
-  if (arguments[1] === undefined || typeof arguments[1] == 'string') {
-    encoding = arguments[1];
-    if (typeof arguments[2] == 'number') {
-      fd = arguments[2];
-      callback = arguments[3];
+  if (bytesWritten == len) {
+    // awesome. sent to buffer.
+    if (buffer === pool) {
+      // If we're just writing from the pool then we can make a little
+      // optimization and save the space.
+      buffer.used -= len;
+    }
+
+    if (queuedData) {
+      return false;
     } else {
-      callback = arguments[2];
+      return true;
     }
-  } else if (typeof arguments[1] == 'number') {
-    fd = arguments[1];
-    callback = arguments[2];
-  } else if (typeof arguments[1] == 'function') {
-    callback = arguments[1];
-  } else {
-    throw new Error("Bad type for second argument");
   }
 
+  // Didn't write the entire thing to buffer.
+  // Need to wait for the socket to become available before trying again.
+  this._writeWatcher.start();
 
-  var queueSize = this._appendBucket(data, encoding, fd, callback);
+  // Slice out the data left.
+  var leftOver = buffer.slice(off + bytesWritten, off + len);
+  leftOver.used = leftOver.length; // used the whole thing...
 
-  if (this._connecting) return false;
+  //  util.error('data.used = ' + data.used);
+  //if (!this._writeQueue) initWriteStream(this);
 
-  this._onWritable(); // Insert writeWatcher into the dumpQueue
-  require('timers').active(this);
+  // data should be the next thing to write.
+  this._writeQueue.unshift(leftOver);
+  this._writeQueueEncoding.unshift(null);
 
-  if (queueSize > (64*1024) && !this._haveTriedFlush) {
-    IOWatcher.flush();
-    this._haveTriedFlush = true;
+  // If didn't successfully write any bytes, enqueue our fd and try again
+  if (!bytesWritten) {
+    this._writeQueueFD.unshift(fd);
   }
 
-  return queueSize < (64*1024);
+  return false;
+};
+
+
+// Flushes the write buffer out.
+// Returns true if the entire buffer was flushed.
+Stream.prototype.flush = function () {
+  while (this._writeQueue && this._writeQueue.length) {
+    var data = this._writeQueue.shift();
+    var encoding = this._writeQueueEncoding.shift();
+    var fd = this._writeQueueFD.shift();
+
+    if (data === END_OF_FILE) {
+      this._shutdown();
+      return true;
+    }
+
+    var flushed = this._writeOut(data,encoding,fd);
+    if (!flushed) return false;
+  }
+  if (this._writeWatcher) this._writeWatcher.stop();
+  return true;
+};
+
+
+Stream.prototype._writeQueueLast = function () {
+  return this._writeQueue.length > 0 ? this._writeQueue[this._writeQueue.length-1]
+                                     : null;
 };
 
 
@@ -377,12 +473,6 @@ Stream.prototype._onConnect = function () {
     this._connecting = false;
     this.resume();
     this.readable = this.writable = true;
-
-    if (this._writeWatcher.firstBucket) {
-      // Flush this in case any writes are queued up while connecting.
-      this._onWritable();
-    }
-
     try {
       this.emit('connect');
     } catch (e) {
@@ -390,6 +480,12 @@ Stream.prototype._onConnect = function () {
       return;
     }
 
+
+    if (this._writeQueue && this._writeQueue.length) {
+      // Flush this in case any writes are queued up while connecting.
+      this._onWritable();
+    }
+
   } else if (errno != EINPROGRESS) {
     this.destroy(errnoException(errno, 'connect'));
   }
@@ -397,10 +493,11 @@ Stream.prototype._onConnect = function () {
 
 
 Stream.prototype._onWritable = function () {
-  // Stick it into the dumpQueue
-  if (!this._writeWatcher.next) {
-    this._writeWatcher.next = IOWatcher.dumpQueue.next;
-    IOWatcher.dumpQueue.next = this._writeWatcher;
+  // Stream becomes writable on connect() but don't flush if there's
+  // nothing actually to write
+  if (this.flush()) {
+    if (this._events && this._events['drain']) this.emit("drain");
+    if (this.ondrain) this.ondrain(); // Optimization
   }
 };
 
@@ -541,12 +638,7 @@ Stream.prototype.pause = function () {
 
 
 Stream.prototype.resume = function () {
-  if (this.fd === null) {
-    // TODO, FIXME: throwing here breaks test/simple/test-pipe.js
-    // throw new Error('Cannot resume() closed Stream.');
-    return;
-  }
-  this._readWatcher.stop();
+  if (this.fd === null) throw new Error('Cannot resume() closed Stream.');
   this._readWatcher.set(this.fd, true, false);
   this._readWatcher.start();
 };
@@ -556,14 +648,15 @@ Stream.prototype.destroy = function (exception) {
   // pool is shared between sockets, so don't need to free it here.
   var self = this;
 
-  this._eof = this.readable = this.writable = false;
+  // TODO would like to set _writeQueue to null to avoid extra object alloc,
+  // but lots of code assumes this._writeQueue is always an array.
+  this._writeQueue = [];
+
+  this.readable = this.writable = false;
 
   if (this._writeWatcher) {
     this._writeWatcher.stop();
     this._writeWatcher.socket = null;
-    this._writeWatcher.firstBucket = null;
-    this._writeWatcher.lastBucket = null;
-    this._writeWatcher.isUnixSocket = false;
     ioWatchers.free(this._writeWatcher);
     this._writeWatcher = null;
   }
@@ -602,7 +695,7 @@ Stream.prototype._shutdown = function () {
       this.writable = false;
 
       try {
-        shutdown(this.fd, 'write');
+        this._shutdownImpl();
       } catch (e) {
         this.destroy(e);
       }
@@ -615,14 +708,15 @@ Stream.prototype._shutdown = function () {
 
 
 Stream.prototype.end = function (data, encoding) {
-  if (!this.writable) return; // TODO this should throw error
-  if (this._eof) return; // TODO this should also throw error
-
-  if (data) this._appendBucket(data, encoding);
-  this._eof = true;
-
-  // If this isn't in the dumpQueue then we shutdown now.
-  if (!this._writeWatcher.firstBucket) this._shutdown();
+  if (this.writable) {
+    if (this._writeQueueLast() !== END_OF_FILE) {
+      if (data) this.write(data, encoding);
+      this._writeQueue.push(END_OF_FILE);
+      if (!this._connecting) {
+        this.flush();
+      }
+    }
+  }
 };
 
 
index cdcd66d..46c56e2 100644 (file)
@@ -1967,8 +1967,6 @@ int Start(int argc, char *argv[]) {
 
     Tick();
 
-    IOWatcher::Dump();
-
   } while (need_tick_cb || ev_activecnt(EV_DEFAULT_UC) > 0);
 
 
index ea445de..31e7da4 100644 (file)
@@ -29,10 +29,6 @@ process.assert = function (x, msg) {
 
 var writeError = process.binding('stdio').writeError;
 
-// Need to force-load this binding so that we can IOWatcher::Dump in
-// src/node.cc
-var IOWatcher = process.binding('io_watcher');
-
 // nextTick()
 
 var nextTickQueue = [];
index 00a6676..a042c80 100644 (file)
@@ -82,8 +82,7 @@ static size_t ByteLength (Handle<String> string, enum encoding enc) {
 }
 
 
-Local<Object> Buffer::New(Handle<String> string,
-                          Handle<Value> encoding) {
+Handle<Object> Buffer::New(Handle<String> string) {
   HandleScope scope;
 
   // get Buffer from global scope.
@@ -92,9 +91,8 @@ Local<Object> Buffer::New(Handle<String> string,
   assert(bv->IsFunction());
   Local<Function> b = Local<Function>::Cast(bv);
 
-  Local<Value> argv[2] = { Local<Value>::New(string),
-                           Local<Value>::New(encoding) };
-  Local<Object> instance = b->NewInstance(2, argv);
+  Local<Value> argv[1] = { Local<Value>::New(string) };
+  Local<Object> instance = b->NewInstance(1, argv);
 
   return scope.Close(instance);
 }
index fa39b1e..79fa34d 100644 (file)
@@ -25,9 +25,7 @@ class Buffer : public ObjectWrap {
   typedef void (*free_callback)(char *data, void *hint);
 
   // C++ API for constructing fast buffer
-  static v8::Local<v8::Object> New(
-      v8::Handle<v8::String> string,
-      v8::Handle<v8::Value> encoding = v8::Handle<v8::Value>());
+  static v8::Handle<v8::Object> New(v8::Handle<v8::String> string);
 
   static void Initialize(v8::Handle<v8::Object> target);
   static Buffer* New(size_t length); // public constructor
index a947757..7e51615 100644 (file)
@@ -2,44 +2,17 @@
 #include <node_io_watcher.h>
 
 #include <node.h>
-#include <node_buffer.h>
 #include <v8.h>
 
-
-#include <sys/uio.h> /* writev */
-#include <errno.h>
-#include <limits.h> /* IOV_MAX */
-
-#include <sys/types.h>
-#include <sys/socket.h>
-
-
 #include <assert.h>
 
 namespace node {
 
 using namespace v8;
 
-static ev_prepare dumper;
-static Persistent<Object> dump_queue;
-
 Persistent<FunctionTemplate> IOWatcher::constructor_template;
 Persistent<String> callback_symbol;
 
-static Persistent<String> next_sym;
-static Persistent<String> prev_sym;
-static Persistent<String> ondrain_sym;
-static Persistent<String> onerror_sym;
-static Persistent<String> data_sym;
-static Persistent<String> encoding_sym;
-static Persistent<String> offset_sym;
-static Persistent<String> fd_sym;
-static Persistent<String> is_unix_socket_sym;
-static Persistent<String> first_bucket_sym;
-static Persistent<String> last_bucket_sym;
-static Persistent<String> queue_size_sym;
-static Persistent<String> callback_sym;
-
 
 void IOWatcher::Initialize(Handle<Object> target) {
   HandleScope scope;
@@ -53,39 +26,9 @@ void IOWatcher::Initialize(Handle<Object> target) {
   NODE_SET_PROTOTYPE_METHOD(constructor_template, "stop", IOWatcher::Stop);
   NODE_SET_PROTOTYPE_METHOD(constructor_template, "set", IOWatcher::Set);
 
-  Local<Function> io_watcher = constructor_template->GetFunction();
-  target->Set(String::NewSymbol("IOWatcher"), io_watcher);
-
-  NODE_SET_METHOD(constructor_template->GetFunction(),
-                  "flush",
-                  IOWatcher::Flush);
+  target->Set(String::NewSymbol("IOWatcher"), constructor_template->GetFunction());
 
   callback_symbol = NODE_PSYMBOL("callback");
-
-  next_sym = NODE_PSYMBOL("next");
-  prev_sym = NODE_PSYMBOL("prev");
-  ondrain_sym = NODE_PSYMBOL("ondrain");
-  onerror_sym = NODE_PSYMBOL("onerror");
-  first_bucket_sym = NODE_PSYMBOL("firstBucket");
-  last_bucket_sym = NODE_PSYMBOL("lastBucket");
-  queue_size_sym = NODE_PSYMBOL("queueSize");
-  offset_sym = NODE_PSYMBOL("offset");
-  fd_sym = NODE_PSYMBOL("fd");
-  is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
-  data_sym = NODE_PSYMBOL("data");
-  encoding_sym = NODE_PSYMBOL("encoding");
-  callback_sym = NODE_PSYMBOL("callback");
-
-
-  ev_prepare_init(&dumper, IOWatcher::Dump);
-  ev_prepare_start(EV_DEFAULT_UC_ &dumper);
-  // Need to make sure that Dump runs *after* all other prepare watchers -
-  // in particular the next tick one.
-  ev_set_priority(&dumper, EV_MINPRI);
-  ev_unref(EV_DEFAULT_UC);
-
-  dump_queue = Persistent<Object>::New(Object::New());
-  io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue);
 }
 
 
@@ -201,384 +144,5 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
 }
 
 
-Handle<Value> IOWatcher::Flush(const Arguments& args) {
-  HandleScope scope; // unneccessary?
-  IOWatcher::Dump();
-  return Undefined();
-}
-
-#define KB 1024
-
-/*
- * A large javascript object structure is built up in net.js. The function
- * Dump is called at the end of each iteration, before select() is called,
- * to push all the data out to sockets.
- *
- * The structure looks like this:
- *
- * IOWatcher . dumpQueue
- *               |
- *               watcher . buckets - b - b - b - b
- *               |
- *               watcher . buckets - b - b
- *               |
- *               watcher . buckets - b
- *               |
- *               watcher . buckets - b - b - b
- *
- * The 'b' nodes are little javascript objects buckets. Each has a 'data'
- * member. 'data' is either a string or buffer. E.G.
- *
- *   b = { data: "hello world" }
- *
- */
-
-// To enable this debug output, add '-DDUMP_DEBUG' to CPPFLAGS
-// in 'build/c4che/default.cache.py' and 'make clean all'
-#ifdef DUMP_DEBUG
-#define DEBUG_PRINT(fmt,...) \
-  fprintf(stderr, "(dump:%d) " fmt "\n",  __LINE__, ##__VA_ARGS__)
-#else
-#define DEBUG_PRINT(fmt,...)
-#endif
-
-
-void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
-  assert(revents == EV_PREPARE);
-  assert(w == &dumper);
-  Dump();
-}
-
-
-void IOWatcher::Dump() {
-  HandleScope scope;
-
-  static struct iovec iov[IOV_MAX];
-
-  // Loop over all watchers in the dump queue. Each one stands for a socket
-  // that has stuff to be written out.
-  //
-  // There are several possible outcomes for each watcher.
-  // 1. All the buckets associated with the watcher are written out. In this
-  //    case the watcher is disabled; it is removed from the dump_queue.
-  // 2. Some of the data was written, but there still remains buckets. In
-  //    this case the watcher is enabled (i.e. we wait for the file
-  //    descriptor to become readable) and we remove it from the dump_queue.
-  //    When it becomes readable, we'll get a callback in net.js and add it
-  //    again to the dump_queue
-  // 3. writev returns EAGAIN. This is the same as case 2.
-  //
-  // In any case, the dump queue should be empty when we exit this function.
-  // (See the assert at the end of the outermost for loop.
-  Local<Value> watcher_v;
-  Local<Object> watcher;
-
-  for (watcher_v = dump_queue->Get(next_sym);
-       watcher_v->IsObject();
-       dump_queue->Set(next_sym, (watcher_v = watcher->Get(next_sym))),
-       watcher->Set(next_sym, Null())) {
-    watcher = watcher_v->ToObject();
-
-    IOWatcher *io = ObjectWrap::Unwrap<IOWatcher>(watcher);
-
-    // stats (just for fun)
-    io->dumps_++;
-    io->last_dump_ = ev_now(EV_DEFAULT_UC);
-
-    DEBUG_PRINT("<%d> dumping", io->watcher_.fd);
-
-    // Number of items we've stored in iov
-    int iovcnt = 0;
-    // Number of bytes we've stored in iov
-    size_t to_write = 0;
-
-    bool unix_socket = false;
-    if (watcher->Has(is_unix_socket_sym) && watcher->Get(is_unix_socket_sym)->IsTrue()) {
-      unix_socket = true;
-    }
-
-    // Unix sockets don't like huge messages. TCP sockets do.
-    // TODO: handle EMSGSIZE after sendmsg().
-    size_t max_to_write = unix_socket ? 8*KB : 256*KB;
-
-    int fd_to_send = -1;
-
-    // Offset is only as large as the first buffer of data. (See assert
-    // below) Offset > 0 occurs when a previous writev could not entirely
-    // drain a bucket.
-    size_t offset = 0;
-    if (watcher->Has(offset_sym)) {
-      offset = watcher->Get(offset_sym)->Uint32Value();
-    }
-    size_t first_offset = offset;
-
-    DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset);
-
-    // Loop over all the buckets for this particular watcher/socket in order
-    // to fill iov.
-    Local<Value> bucket_v;
-    Local<Object> bucket;
-    unsigned int bucket_index = 0;
-
-    for (bucket_v = watcher->Get(first_bucket_sym);
-           // Break if we have an FD to send.
-           // sendmsg can only handle one FD at a time.
-           fd_to_send < 0 &&
-           // break if we've hit the end
-           bucket_v->IsObject() &&
-           // break if iov contains a lot of data
-           to_write < max_to_write &&
-           // break if iov is running out of space
-           iovcnt < IOV_MAX;
-         bucket_v = bucket->Get(next_sym), bucket_index++) {
-      assert(bucket_v->IsObject());
-      bucket = bucket_v->ToObject();
-
-      Local<Value> data_v = bucket->Get(data_sym);
-      // net.js will be setting this 'data' value. We can ensure that it is
-      // never empty.
-      assert(!data_v.IsEmpty());
-
-      Local<Object> buf_object;
-
-      if (data_v->IsString()) {
-        // TODO: insert v8::String::Pointers() hack here.
-        Local<String> s = data_v->ToString();
-        Local<Value> e = bucket->Get(encoding_sym);
-        buf_object = Buffer::New(s, e);
-        bucket->Set(data_sym, buf_object);
-      } else {
-        assert(Buffer::HasInstance(data_v));
-        buf_object = data_v->ToObject();
-      }
-
-      size_t l = Buffer::Length(buf_object);
-
-      if (l == 0) continue;
-
-      assert(first_offset < l);
-      iov[iovcnt].iov_base = Buffer::Data(buf_object) + first_offset;
-      iov[iovcnt].iov_len = l - first_offset;
-      to_write += iov[iovcnt].iov_len;
-      iovcnt++;
-
-      first_offset = 0; // only the first buffer will be offset.
-
-      if (unix_socket && bucket->Has(fd_sym)) {
-        Local<Value> fd_v = bucket->Get(fd_sym);
-        if (fd_v->IsInt32()) {
-          fd_to_send = fd_v->Int32Value();
-          DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send);
-          assert(fd_to_send >= 0);
-        }
-      }
-    }
-
-    if (to_write > 0) {
-      ssize_t written;
-
-      if (unix_socket) {
-        struct msghdr msg;
-        char scratch[64];
-
-        msg.msg_name = NULL;
-        msg.msg_namelen = 0;
-        msg.msg_iov = iov;
-        msg.msg_iovlen = iovcnt;
-        msg.msg_control = NULL; // void*
-        msg.msg_controllen = 0; // socklen_t
-        msg.msg_flags = 0; // int
-
-        if (fd_to_send >= 0) {
-          struct cmsghdr *cmsg;
-
-          msg.msg_control = (void *) scratch;
-          msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
-
-          cmsg = CMSG_FIRSTHDR(&msg);
-          cmsg->cmsg_level = SOL_SOCKET;
-          cmsg->cmsg_type = SCM_RIGHTS;
-          cmsg->cmsg_len = msg.msg_controllen;
-          *(int*) CMSG_DATA(cmsg) = fd_to_send;
-        }
-
-        written = sendmsg(io->watcher_.fd, &msg, 0);
-      } else {
-        written = writev(io->watcher_.fd, iov, iovcnt);
-      }
-
-      DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld",
-                  io->watcher_.fd,
-                  iovcnt,
-                  to_write,
-                  written);
-
-      if (written < 0) {
-        // Allow EAGAIN.
-        // TODO: handle EMSGSIZE after sendmsg().
-        if (errno == EAGAIN) {
-          DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd);
-          io->Start();
-        } else {
-          // Emit error event
-          if (watcher->Has(onerror_sym)) {
-            Local<Value> callback_v = io->handle_->Get(onerror_sym);
-            assert(callback_v->IsFunction());
-            Local<Function> callback = Local<Function>::Cast(callback_v);
-
-            Local<Value> argv[1] = { Integer::New(errno) };
-
-            TryCatch try_catch;
-
-            callback->Call(io->handle_, 1, argv);
-
-            if (try_catch.HasCaught()) {
-              FatalException(try_catch);
-            }
-          }
-        }
-        // Continue with the next watcher.
-        continue;
-      }
-
-      // what about written == 0 ?
-
-      size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
-      DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size);
-      assert(queue_size >= offset);
-
-      // Now drop the buckets that have been written.
-      bucket_index = 0;
-
-      while (written > 0) {
-        bucket_v = watcher->Get(first_bucket_sym);
-        if (!bucket_v->IsObject()) {
-          // No more buckets in the queue. Make sure the last_bucket_sym is
-          // updated and then go to the next watcher.
-          watcher->Set(last_bucket_sym, Null());
-          break;
-        }
-
-        bucket = bucket_v->ToObject();
-
-        Local<Value> data_v = bucket->Get(data_sym);
-        assert(!data_v.IsEmpty());
-
-        // At the moment we're turning all string into buffers
-        // so we assert that this is not a string. However, when the
-        // "Pointer patch" lands, this assert will need to be removed.
-        assert(!data_v->IsString());
-        // When the "Pointer patch" lands, we will need to be careful
-        // to somehow store the length of strings that we're optimizing on
-        // so that it need not be recalculated here. Note the "Pointer patch"
-        // will only apply to ASCII strings - UTF8 ones will need to be
-        // serialized onto a buffer.
-        size_t bucket_len = Buffer::Length(data_v->ToObject());
-
-        if (unix_socket && bucket->Has(fd_sym)) {
-          bucket->Set(fd_sym, Null());
-        }
-
-        DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld",
-                    io->watcher_.fd,
-                    bucket_index,
-                    bucket_len,
-                    offset);
-        assert(bucket_len > offset);
-
-        // Only on the first bucket does is the offset > 0.
-        if (offset + written < bucket_len) {
-          // we have not written the entire bucket
-          DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. "
-                      "setting watcher.offset = %ld",
-                      io->watcher_.fd,
-                      bucket_index,
-                      offset + written);
-
-          watcher->Set(offset_sym,
-                       Integer::NewFromUnsigned(offset + written));
-          break;
-        } else {
-          DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.",
-                      io->watcher_.fd,
-                      bucket_index);
-
-          assert(bucket_len <= queue_size);
-          queue_size -= bucket_len;
-
-          assert(bucket_len - offset <= written);
-          written -= bucket_len - offset;
-
-          Local<Value> bucket_callback_v = bucket->Get(callback_sym);
-          if (bucket_callback_v->IsFunction()) {
-            Local<Function> bucket_callback =
-              Local<Function>::Cast(bucket_callback_v);
-            TryCatch try_catch;
-            bucket_callback->Call(io->handle_, 0, NULL);
-            if (try_catch.HasCaught()) {
-              FatalException(try_catch);
-            }
-          }
-
-          // Offset is now zero
-          watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
-        }
-
-        offset = 0; // the next bucket will have zero offset;
-        bucket_index++;
-
-        // unshift
-        watcher->Set(first_bucket_sym, bucket->Get(next_sym));
-      }
-
-      watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
-    }
-
-
-    // Finished dumping the buckets.
-    //
-    // If our list of buckets is empty, we can emit 'drain' and forget about
-    // this socket. Nothing needs to be done.
-    //
-    // Otherwise we need to prepare the io_watcher to wait for the interface
-    // to become writable again.
-
-    if (watcher->Get(first_bucket_sym)->IsObject()) {
-      // Still have buckets to be written. Wait for fd to become writable.
-      io->Start();
-
-      DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd);
-    } else {
-      // No more buckets in the queue. Make sure the last_bucket_sym is
-      // updated and then go to the next watcher.
-      watcher->Set(last_bucket_sym, Null());
-
-      // Emptied the buckets queue for this socket.  Don't wait for it to
-      // become writable.
-      io->Stop();
-
-      DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd);
-
-      // Emit drain event
-      if (watcher->Has(ondrain_sym)) {
-        Local<Value> callback_v = io->handle_->Get(ondrain_sym);
-        assert(callback_v->IsFunction());
-        Local<Function> callback = Local<Function>::Cast(callback_v);
-
-        TryCatch try_catch;
-
-        callback->Call(io->handle_, 0, NULL);
-
-        if (try_catch.HasCaught()) {
-          FatalException(try_catch);
-        }
-      }
-    }
-  }
-
-  // Assert that the dump_queue is empty.
-  assert(!dump_queue->Get(next_sym)->IsObject());
-}
-
 
 }  // namespace node
index 71d7142..06d431e 100644 (file)
@@ -10,7 +10,6 @@ namespace node {
 class IOWatcher : ObjectWrap {
  public:
   static void Initialize(v8::Handle<v8::Object> target);
-  static void Dump();
 
  protected:
   static v8::Persistent<v8::FunctionTemplate> constructor_template;
@@ -27,7 +26,6 @@ class IOWatcher : ObjectWrap {
   }
 
   static v8::Handle<v8::Value> New(const v8::Arguments& args);
-  static v8::Handle<v8::Value> Flush(const v8::Arguments& args);
   static v8::Handle<v8::Value> Start(const v8::Arguments& args);
   static v8::Handle<v8::Value> Stop(const v8::Arguments& args);
   static v8::Handle<v8::Value> Set(const v8::Arguments& args);
@@ -35,15 +33,9 @@ class IOWatcher : ObjectWrap {
  private:
   static void Callback(EV_P_ ev_io *watcher, int revents);
 
-  static void Dump(EV_P_ ev_prepare *watcher, int revents);
-
   void Start();
   void Stop();
 
-  // stats. TODO: expose to js, add reset() method
-  uint64_t dumps_;
-  ev_tstamp last_dump_;
-
   ev_io watcher_;
 };
 
index f068fc8..a74e2dc 100644 (file)
@@ -3,7 +3,6 @@
 
 #include <node.h>
 #include <node_buffer.h>
-#include <node_io_watcher.h>
 
 #include <string.h>
 #include <stdlib.h>
@@ -38,6 +37,7 @@
 
 #define ARRAY_SIZE(a) (sizeof(a) / sizeof(*(a)))
 
+
 namespace node {
 
 using namespace v8;
index 8f06469..09b2864 100644 (file)
@@ -22,33 +22,35 @@ function processData(s) {
   // version of our modified object back. Clean up when we're done.
   var pipeStream = new net.Stream(fd);
 
-  pipeStream.resume();
-
-  pipeStream.write(JSON.stringify(d) + '\n', function () {
+  var drainFunc = function() {
     pipeStream.destroy();
 
     if (++numSentMessages == 2) {
       s.destroy();
     }
-  });
+  };
+
+  pipeStream.addListener('drain', drainFunc);
+  pipeStream.resume();
+
+  if (pipeStream.write(JSON.stringify(d) + '\n')) {
+    drainFunc();
+  }
 };
 
 // Create a UNIX socket to the path defined by argv[2] and read a file
 // descriptor and misc data from it.
 var s = new net.Stream();
-
 s.addListener('fd', function(fd) {
   receivedFDs.unshift(fd);
   processData(s);
 });
-
 s.addListener('data', function(data) {
   data.toString('utf8').trim().split('\n').forEach(function(d) {
     receivedData.unshift(JSON.parse(d));
   });
   processData(s);
 });
-
 s.connect(process.argv[2]);
 
 // vim:ts=2 sw=2 et
diff --git a/test/simple/test-dumper-unix.js b/test/simple/test-dumper-unix.js
deleted file mode 100644 (file)
index 2e51036..0000000
+++ /dev/null
@@ -1,135 +0,0 @@
-var assert =require('assert');
-var IOWatcher = process.binding('io_watcher').IOWatcher;
-var errnoException = process.binding('net').errnoException;
-var close = process.binding('net').close;
-var net = require('net');
-
-var ncomplete = 0;
-
-function test (N, b, cb) {
-  var fdsSent = 0;
-  var fdsRecv = 0;
-  //console.trace();
-  var expected = N * b.length;
-  var nread = 0;
-
-  // Create a socketpair
-  var fds = process.binding('net').socketpair();
-
-  // Use writev/dumper to send data down the one of the sockets, fds[1].
-  // This requires a IOWatcher.
-  var w = new IOWatcher();
-  w.set(fds[1], false, true);
-  w.isUnixSocket = true;
-
-  w.callback = function (readable, writable) {
-    assert.ok(!readable && writable); // not really important.
-    // Insert watcher into dumpQueue
-    w.next = IOWatcher.dumpQueue.next;
-    IOWatcher.dumpQueue.next = w;
-  }
-
-  var ndrain = 0;
-  w.ondrain = function () {
-    ndrain++;
-  }
-
-  var nerror = 0;
-  w.onerror = function (errno) {
-    throw errnoException(errno);
-    nerror++;
-  }
-
-  // The read end, fds[0], will be used to count how much comes through.
-  // This sets up a readable stream on fds[0].
-  var stream = new net.Stream({ fd: fds[0], type: 'unix' });
-  //stream.readable = true;
-  stream.resume();
-
-  stream.on('fd', function (fd) {
-    console.log('got fd %d', fd);
-    fdsRecv++;
-  });
-
-  // Count the data as it arrives on the other end
-  stream.on('data', function (d) {
-    nread += d.length;
-
-    if (nread >= expected) {
-      assert.ok(nread === expected);
-      assert.equal(1, ndrain);
-      assert.equal(0, nerror);
-      console.error("done. wrote %d bytes\n", nread);
-      close(fds[1]);
-    }
-  });
-
-
-  stream.on('close', function () {
-    assert.equal(fdsSent, fdsRecv);
-    // check to make sure the watcher isn't in the dump queue.
-    for (var x = IOWatcher.dumpQueue; x; x = x.next) {
-      assert.ok(x !== w);
-    }
-    assert.equal(null, w.next);
-    // completely flushed
-    assert.ok(!w.firstBucket);
-    assert.ok(!w.lastBucket);
-
-    ncomplete++;
-    if (cb) cb();
-  });
-
-
-  // Insert watcher into dumpQueue
-  w.next = IOWatcher.dumpQueue.next;
-  IOWatcher.dumpQueue.next = w;
-
-  w.firstBucket = { data: b };
-  w.lastBucket = w.firstBucket;
-  w.queueSize = b.length;
-
-  for (var i = 0; i < N-1; i++) {
-    var bucket = { data: b };
-    w.lastBucket.next = bucket;
-    w.lastBucket = bucket;
-    w.queueSize += b.length;
-    // Kind of randomly fill these buckets with fds.
-    if (fdsSent < 5 && i % 2 == 0) {
-      bucket.fd = 1; // send stdout
-      fdsSent++;
-    }
-  }
-}
-
-
-function runTests (values) {
-  expectedToComplete = values.length;
-
-  function go () {
-    if (ncomplete < values.length) {
-      var v = values[ncomplete];
-      console.log("test N=%d, size=%d", v[0], v[1].length);
-      test(v[0], v[1], go);
-    }
-  }
-
-  go();
-}
-
-runTests([ [30, Buffer(1000)]
-         , [4, Buffer(10000)]
-         , [1, "hello world\n"]
-         , [50, Buffer(1024*1024)]
-         , [500, Buffer(40960+1)]
-         , [500, Buffer(40960-1)]
-         , [500, Buffer(40960)]
-         , [500, Buffer(1024*1024+1)]
-         , [50000, "hello world\n"]
-         ]);
-
-
-process.on('exit', function () {
-  assert.equal(expectedToComplete, ncomplete);
-});
-
diff --git a/test/simple/test-dumper.js b/test/simple/test-dumper.js
deleted file mode 100644 (file)
index c159774..0000000
+++ /dev/null
@@ -1,128 +0,0 @@
-var assert =require('assert');
-var IOWatcher = process.binding('io_watcher').IOWatcher;
-var errnoException = process.binding('net').errnoException;
-var close = process.binding('net').close;
-var net = require('net');
-
-var ncomplete = 0;
-
-
-
-
-
-function test (N, b, cb) {
-  //console.trace();
-  var expected = N * b.length;
-  var nread = 0;
-
-  // Create a pipe
-  var fds = process.binding('net').pipe();
-  console.log("fds == %j", fds);
-
-  // Use writev/dumper to send data down the write end of the pipe, fds[1].
-  // This requires a IOWatcher.
-  var w = new IOWatcher();
-  w.set(fds[1], false, true);
-
-  w.callback = function (readable, writable) {
-    assert.ok(!readable && writable); // not really important.
-    // Insert watcher into dumpQueue
-    w.next = IOWatcher.dumpQueue.next;
-    IOWatcher.dumpQueue.next = w;
-  }
-
-  var ndrain = 0;
-  w.ondrain = function () {
-    ndrain++;
-  }
-
-  var nerror = 0;
-  w.onerror = function (errno) {
-    throw errnoException(errno);
-    nerror++;
-  }
-
-  // The read end, fds[0], will be used to count how much comes through.
-  // This sets up a readable stream on fds[0].
-  var stream = new net.Stream();
-  stream.open(fds[0]);
-  stream.readable = true;
-  stream.resume();
-
-  // Count the data as it arrives on the read end of the pipe.
-  stream.on('data', function (d) {
-    nread += d.length;
-
-    if (nread >= expected) {
-      assert.ok(nread === expected);
-      assert.equal(1, ndrain);
-      assert.equal(0, nerror);
-      console.error("done. wrote %d bytes\n", nread);
-      close(fds[1]);
-    }
-  });
-
-  stream.on('close', function () {
-    // check to make sure the watcher isn't in the dump queue.
-    for (var x = IOWatcher.dumpQueue; x; x = x.next) {
-      assert.ok(x !== w);
-    }
-    assert.equal(null, w.next);
-    // completely flushed
-    assert.ok(!w.firstBucket);
-    assert.ok(!w.lastBucket);
-
-    ncomplete++;
-    if (cb) cb();
-  });
-
-
-  // Insert watcher into dumpQueue
-  w.next = IOWatcher.dumpQueue.next;
-  IOWatcher.dumpQueue.next = w;
-
-  w.firstBucket = { data: b };
-  w.lastBucket = w.firstBucket;
-  w.queueSize = b.length;
-
-  for (var i = 0; i < N-1; i++) {
-    var bucket = { data: b };
-    assert.ok(!w.lastBucket.next);
-    w.lastBucket.next = bucket;
-    w.lastBucket = bucket;
-    w.queueSize += b.length;
-  }
-}
-
-
-function runTests (values) {
-  expectedToComplete = values.length;
-
-  function go () {
-    if (ncomplete < values.length) {
-      var v = values[ncomplete];
-      console.log("test N=%d, size=%d", v[0], v[1].length);
-      test(v[0], v[1], go);
-    }
-  }
-
-  go();
-}
-
-runTests([ [3, Buffer(1000)],
-           [30, Buffer(1000)],
-           [4, Buffer(10000)],
-           [1, "hello world\n"],
-           [50, Buffer(1024*1024)],
-           [500, Buffer(40960+1)],
-           [500, Buffer(40960-1)],
-           [500, Buffer(40960)],
-           [500, Buffer(1024*1024+1)],
-           [50000, "hello world\n"]
-         ]);
-
-
-process.on('exit', function () {
-  assert.equal(expectedToComplete, ncomplete);
-});
-
index d12c2b1..75db48e 100644 (file)
@@ -17,22 +17,20 @@ var bufferSize = 5 * 1024 * 1024;
  */
 var buffer = Buffer(bufferSize);
 for (var i = 0; i < buffer.length; i++) {
-  buffer[i] = 100; //parseInt(Math.random()*10000) % 256;
+  buffer[i] = parseInt(Math.random()*10000) % 256;
 }
 
 
 var web = http.Server(function (req, res) {
   web.close();
 
-  console.log("web server connection fd=%d", req.connection.fd);
-
   console.log(req.headers);
 
   var socket = net.Stream();
   socket.connect(tcpPort);
   
   socket.on('connect', function () {
-    console.log('http->tcp connected fd=%d', socket.fd);
+    console.log('socket connected');
   });
 
   req.pipe(socket);
@@ -56,7 +54,7 @@ web.listen(webPort, startClient);
 var tcp = net.Server(function (s) {
   tcp.close();
 
-  console.log("tcp server connection fd=%d", s.fd);
+  console.log("tcp server connection");
 
   var i = 0;
 
@@ -93,11 +91,6 @@ function startClient () {
   req.write(buffer);
   req.end();
 
-  console.log("request fd=%d", req.connection.fd);
-
-  // note the queue includes http headers.
-  assert.ok(req.connection.writeQueueSize() > buffer.length);
-
   req.on('response', function (res) {
     console.log('Got response');
     res.setEncoding('utf8');
index 8052a13..7ed7b02 100644 (file)
@@ -53,7 +53,7 @@ var logChild = function(d) {
 
   d.split('\n').forEach(function(l) {
     if (l.length > 0) {
-      console.error('CHILD: ' + l);
+      common.debug('CHILD: ' + l);
     }
   });
 };
@@ -96,18 +96,19 @@ var srv = net.createServer(function(s) {
   buf.write(JSON.stringify(DATA) + '\n', 'utf8');
 
   s.write(str, 'utf8', pipeFDs[1]);
-
-  s.write(buf, pipeFDs[1], function () {
-    console.error("close pipeFDs[1]");
+  if (s.write(buf, undefined, pipeFDs[1])) {
     netBinding.close(pipeFDs[1]);
-  });
+  } else {
+    s.addListener('drain', function() {
+      netBinding.close(pipeFDs[1]);
+    });
+  }
 });
 srv.listen(SOCK_PATH);
 
 // Spawn a child running test/fixtures/recvfd.js
-var cp = child_process.spawn(process.execPath,
-                             [path.join(common.fixturesDir, 'recvfd.js'),
-                              SOCK_PATH]);
+var cp = child_process.spawn(process.argv[0],
+                             [path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]);
 
 cp.stdout.addListener('data', logChild);
 cp.stderr.addListener('data', logChild);