Add callback to socket.write()
authorRyan Dahl <ry@tinyclouds.org>
Wed, 15 Dec 2010 23:47:02 +0000 (15:47 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Wed, 15 Dec 2010 23:47:02 +0000 (15:47 -0800)
doc/api/net.markdown
lib/net.js
test/simple/test-net-connect-buffer.js
test/simple/test-net-pingpong.js

index 229ecaf..6146fd7 100644 (file)
@@ -196,7 +196,7 @@ context of the defined or default list of trusted CA certificates.
 Returns a JSON structure detailing the peer's certificate, containing a dictionary
 with keys for the certificate `'subject'`, `'issuer'`, `'valid_from'` and `'valid_to'`.
 
-#### stream.write(data, [encoding])
+#### stream.write(data, [encoding], [callback])
 
 Sends data on the stream. The second parameter specifies the encoding in the
 case of a string--it defaults to UTF8 encoding.
@@ -205,7 +205,10 @@ Returns `true` if the entire data was flushed successfully to the kernel
 buffer. Returns `false` if all or part of the data was queued in user memory.
 `'drain'` will be emitted when the buffer is again free.
 
-#### stream.write(data, [encoding], [fileDescriptor])
+The optional `callback` parameter will be executed when the data is finally
+written out - this may not be immediately.
+
+#### stream.write(data, [encoding], [fileDescriptor], [callback])
 
 For UNIX sockets, it is possible to send a file descriptor through the
 stream. Simply add the `fileDescriptor` argument and listen for the `'fd'`
index e6ec796..db491be 100644 (file)
@@ -178,6 +178,7 @@ function initStream(self) {
   self._writeQueue = [];
   self._writeQueueEncoding = [];
   self._writeQueueFD = [];
+  self._writeQueueCallbacks = [];
 
   self._writeWatcher = ioWatchers.alloc();
   self._writeWatcher.socket = self;
@@ -296,6 +297,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
       this._writeQueue = [];
       this._writeQueueEncoding = [];
       this._writeQueueFD = [];
+      this._writeQueueCallbacks = [];
     }
 
     // Slow. There is already a write queue, so let's append to it.
@@ -311,9 +313,22 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
         this._writeQueueEncoding[last] === encoding) {
       // optimization - concat onto last
       this._writeQueue[last] += data;
+
+      if (cb) {
+        if (!this._writeQueueCallbacks[last]) {
+          this._writeQueueCallbacks[last] = cb;
+        } else {
+          // awful
+          this._writeQueueCallbacks[last] = function () {
+            this._writeQueueCallbacks[last]();
+            cb();
+          };
+        }
+      }
     } else {
       this._writeQueue.push(data);
       this._writeQueueEncoding.push(encoding);
+      this._writeQueueCallbacks.push(cb);
     }
 
     if (fd != undefined) {
@@ -325,7 +340,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
     // Fast.
     // The most common case. There is no write queue. Just push the data
     // directly to the socket.
-    return this._writeOut(data, encoding, fd);
+    return this._writeOut(data, encoding, fd, cb);
   }
 };
 
@@ -337,7 +352,7 @@ Stream.prototype.write = function(data /* [encoding], [fd], [cb] */) {
 //   2. Write data to socket. Return true if flushed.
 //   3. Slice out remaining
 //   4. Unshift remaining onto _writeQueue. Return false.
-Stream.prototype._writeOut = function(data, encoding, fd) {
+Stream.prototype._writeOut = function(data, encoding, fd, cb) {
   if (!this.writable) {
     throw new Error('Stream is not writable');
   }
@@ -388,6 +403,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) {
       // Unshift whatever didn't fit onto the buffer
       this._writeQueue.unshift(data.slice(charsWritten));
       this._writeQueueEncoding.unshift(encoding);
+      this._writeQueueCallbacks.unshift(cb);
       this._writeWatcher.start();
       queuedData = true;
     }
@@ -416,6 +432,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) {
     if (queuedData) {
       return false;
     } else {
+      if (cb) cb();
       return true;
     }
   }
@@ -434,6 +451,7 @@ Stream.prototype._writeOut = function(data, encoding, fd) {
   // data should be the next thing to write.
   this._writeQueue.unshift(leftOver);
   this._writeQueueEncoding.unshift(null);
+  this._writeQueueCallbacks.unshift(cb);
 
   // If didn't successfully write any bytes, enqueue our fd and try again
   if (!bytesWritten) {
@@ -450,6 +468,7 @@ Stream.prototype.flush = function() {
   while (this._writeQueue && this._writeQueue.length) {
     var data = this._writeQueue.shift();
     var encoding = this._writeQueueEncoding.shift();
+    var cb = this._writeQueueCallbacks.shift();
     var fd = this._writeQueueFD.shift();
 
     if (data === END_OF_FILE) {
@@ -457,7 +476,7 @@ Stream.prototype.flush = function() {
       return true;
     }
 
-    var flushed = this._writeOut(data, encoding, fd);
+    var flushed = this._writeOut(data, encoding, fd, cb);
     if (!flushed) return false;
   }
   if (this._writeWatcher) this._writeWatcher.stop();
index f4cada9..6e6bc88 100644 (file)
@@ -3,6 +3,8 @@ var assert = require('assert');
 var net = require('net');
 
 var tcpPort = common.PORT;
+var fooWritten = false;
+var connectHappened = false;
 
 var tcp = net.Server(function(s) {
   tcp.close();
@@ -25,23 +27,35 @@ var tcp = net.Server(function(s) {
     process.exit(1);
   });
 });
-tcp.listen(tcpPort, startClient);
 
-function startClient() {
+tcp.listen(common.PORT, function () {
   var socket = net.Stream();
 
   console.log('Connecting to socket');
 
   socket.connect(tcpPort);
 
+
   socket.on('connect', function() {
     console.log('socket connected');
+    connectHappened = true;
   });
 
   assert.equal('opening', socket.readyState);
 
-  assert.equal(false, socket.write('foo'));
+  var r = socket.write('foo', function () {
+    fooWritten = true;
+    assert.ok(connectHappened);
+    console.error("foo written");
+  });
+
+  assert.equal(false, r);
   socket.end('bar');
 
   assert.equal('opening', socket.readyState);
-}
+});
+
+process.on('exit', function () {
+  assert.ok(connectHappened);
+  assert.ok(fooWritten);
+});
index 6630728..c154e67 100644 (file)
@@ -8,6 +8,7 @@ var tests_run = 0;
 function pingPongTest(port, host) {
   var N = 1000;
   var count = 0;
+  var sentPongs = 0;
   var sent_final_ping = false;
 
   var server = net.createServer({ allowHalfOpen: true }, function(socket) {
@@ -25,7 +26,10 @@ function pingPongTest(port, host) {
       assert.equal(true, socket.readable);
       assert.equal(true, count <= N);
       if (/PING/.exec(data)) {
-        socket.write('PONG');
+        socket.write('PONG', function () {
+          sentPongs++;
+          console.error('sent PONG');
+        });
       }
     });
 
@@ -85,8 +89,9 @@ function pingPongTest(port, host) {
     });
 
     client.addListener('close', function() {
-      console.log('client.endd');
+      console.log('client.end');
       assert.equal(N + 1, count);
+      assert.equal(N + 1, sentPongs);
       assert.equal(true, sent_final_ping);
       tests_run += 1;
     });