Add callback to socket.write(), fix test-sendfds
authorRyan Dahl <ry@tinyclouds.org>
Sat, 13 Nov 2010 00:24:53 +0000 (16:24 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 19 Nov 2010 00:47:38 +0000 (16:47 -0800)
lib/net.js
src/node_io_watcher.cc
test/fixtures/recvfd.js
test/simple/test-sendfd.js

index 03b09c3c5af0320eb5ed928bd8b43ed58fdd28b4..7019d9d1078b607804e8ed3305d129b9f7629b00 100644 (file)
@@ -56,17 +56,18 @@ var ioWatchers = new FreeList("iowatcher", 100, function () {
 
 
 IOWatcher.prototype.ondrain = function () {
-  assert(this.socket);
-  var socket = this.socket;
+  if (this.socket) {
+    var socket = this.socket;
 
-  if (socket.writable || socket.readable) {
-    require('timers').active(socket);
-  }
+    if (socket.writable || socket.readable) {
+      require('timers').active(socket);
+    }
 
-  socket.emit('drain');
-  if (socket.ondrain) socket.ondrain();
+    socket.emit('drain');
+    if (socket.ondrain) socket.ondrain();
 
-  if (socket._eof) socket._shutdown();
+    if (socket._eof) socket._shutdown();
+  }
 };
 
 
@@ -252,12 +253,13 @@ Object.defineProperty(Stream.prototype, 'readyState', {
 });
 
 
-Stream.prototype._appendBucket = function (data, encoding, fd) {
+Stream.prototype._appendBucket = function (data, encoding, fd, callback) {
   if (data.length != 0) {
     // TODO reject empty data.
     var newBucket =  { data: data };
     if (encoding) newBucket.encoding = encoding;
     if (fd) newBucket.fd = fd;
+    if (callback) newBucket.callback = callback;
 
     // TODO properly calculate queueSize
 
@@ -280,7 +282,7 @@ Stream.prototype._appendBucket = function (data, encoding, fd) {
 };
 
 
-Stream.prototype.write = function (data, encoding, fd) {
+Stream.prototype.write = function (data /* encoding, fd, callback */) {
   if (this._eof) {
     throw new Error('Stream.end() called already; cannot write.');
   }
@@ -289,7 +291,29 @@ Stream.prototype.write = function (data, encoding, fd) {
     throw new Error('Stream is not writable');
   }
 
-  var queueSize = this._appendBucket(data, encoding, fd);
+  // parse the arguments. ugly.
+
+  var encoding, fd, callback;
+
+  if (arguments[1] === undefined || typeof arguments[1] == 'string') {
+    encoding = arguments[1];
+    if (typeof arguments[2] == 'number') {
+      fd = arguments[2];
+      callback = arguments[3];
+    } else {
+      callback = arguments[2];
+    }
+  } else if (typeof arguments[1] == 'number') {
+    fd = arguments[1];
+    callback = arguments[2];
+  } else if (typeof arguments[1] == 'function') {
+    callback = arguments[1];
+  } else {
+    throw new Error("Bad type for second argument");
+  }
+
+
+  var queueSize = this._appendBucket(data, encoding, fd, callback);
 
   if (this._connecting) return false;
 
index 3f850762796589d47b66d6c520b75cb4fd90a44f..62c56464632e0fc95f7c0bb763f508c56ef76a1f 100644 (file)
@@ -38,6 +38,7 @@ static Persistent<String> is_unix_socket_sym;
 static Persistent<String> first_bucket_sym;
 static Persistent<String> last_bucket_sym;
 static Persistent<String> queue_size_sym;
+static Persistent<String> callback_sym;
 
 
 void IOWatcher::Initialize(Handle<Object> target) {
@@ -73,6 +74,7 @@ void IOWatcher::Initialize(Handle<Object> target) {
   is_unix_socket_sym = NODE_PSYMBOL("isUnixSocket");
   data_sym = NODE_PSYMBOL("data");
   encoding_sym = NODE_PSYMBOL("encoding");
+  callback_sym = NODE_PSYMBOL("callback");
 
 
   ev_prepare_init(&dumper, IOWatcher::Dump);
@@ -497,6 +499,17 @@ void IOWatcher::Dump() {
 
         written -= bucket_len - offset;
 
+        Local<Value> bucket_callback_v = bucket->Get(callback_sym);
+        if (bucket_callback_v->IsFunction()) {
+          Local<Function> bucket_callback =
+            Local<Function>::Cast(bucket_callback_v);
+          TryCatch try_catch;
+          bucket_callback->Call(io->handle_, 0, NULL);
+          if (try_catch.HasCaught()) {
+            FatalException(try_catch);
+          }
+        }
+
         // Offset is now zero
         watcher->Set(offset_sym, Integer::NewFromUnsigned(0));
       }
index 09b2864b7ed2e25dca238cbc31d85717c0acd0d5..8f064693895d048349153e4099d1a088554702c3 100644 (file)
@@ -22,35 +22,33 @@ function processData(s) {
   // version of our modified object back. Clean up when we're done.
   var pipeStream = new net.Stream(fd);
 
-  var drainFunc = function() {
+  pipeStream.resume();
+
+  pipeStream.write(JSON.stringify(d) + '\n', function () {
     pipeStream.destroy();
 
     if (++numSentMessages == 2) {
       s.destroy();
     }
-  };
-
-  pipeStream.addListener('drain', drainFunc);
-  pipeStream.resume();
-
-  if (pipeStream.write(JSON.stringify(d) + '\n')) {
-    drainFunc();
-  }
+  });
 };
 
 // Create a UNIX socket to the path defined by argv[2] and read a file
 // descriptor and misc data from it.
 var s = new net.Stream();
+
 s.addListener('fd', function(fd) {
   receivedFDs.unshift(fd);
   processData(s);
 });
+
 s.addListener('data', function(data) {
   data.toString('utf8').trim().split('\n').forEach(function(d) {
     receivedData.unshift(JSON.parse(d));
   });
   processData(s);
 });
+
 s.connect(process.argv[2]);
 
 // vim:ts=2 sw=2 et
index 7ed7b02c1a9c5ba6feae5488f0464a11dc251cc0..8052a1366737938c8f3b40eac234667b9dfd87d4 100644 (file)
@@ -53,7 +53,7 @@ var logChild = function(d) {
 
   d.split('\n').forEach(function(l) {
     if (l.length > 0) {
-      common.debug('CHILD: ' + l);
+      console.error('CHILD: ' + l);
     }
   });
 };
@@ -96,19 +96,18 @@ var srv = net.createServer(function(s) {
   buf.write(JSON.stringify(DATA) + '\n', 'utf8');
 
   s.write(str, 'utf8', pipeFDs[1]);
-  if (s.write(buf, undefined, pipeFDs[1])) {
+
+  s.write(buf, pipeFDs[1], function () {
+    console.error("close pipeFDs[1]");
     netBinding.close(pipeFDs[1]);
-  } else {
-    s.addListener('drain', function() {
-      netBinding.close(pipeFDs[1]);
-    });
-  }
+  });
 });
 srv.listen(SOCK_PATH);
 
 // Spawn a child running test/fixtures/recvfd.js
-var cp = child_process.spawn(process.argv[0],
-                             [path.join(common.fixturesDir, 'recvfd.js'), SOCK_PATH]);
+var cp = child_process.spawn(process.execPath,
+                             [path.join(common.fixturesDir, 'recvfd.js'),
+                              SOCK_PATH]);
 
 cp.stdout.addListener('data', logChild);
 cp.stderr.addListener('data', logChild);