Fix a bug regarding queueSize, add asserts
authorRyan Dahl <ry@tinyclouds.org>
Thu, 18 Nov 2010 19:45:32 +0000 (11:45 -0800)
committerRyan Dahl <ry@tinyclouds.org>
Fri, 19 Nov 2010 00:47:38 +0000 (16:47 -0800)
src/node_io_watcher.cc
test/simple/test-dumper-unix.js
test/simple/test-dumper.js
test/simple/test-pipe.js

index 71a640cb0e5b87fae463bc0df9bd700eb4569254..a9477573f023bde945176088730b425050a0b507 100644 (file)
@@ -285,7 +285,7 @@ void IOWatcher::Dump() {
     io->dumps_++;
     io->last_dump_ = ev_now(EV_DEFAULT_UC);
 
-    DEBUG_PRINT("dumping fd %d", io->watcher_.fd);
+    DEBUG_PRINT("<%d> dumping", io->watcher_.fd);
 
     // Number of items we've stored in iov
     int iovcnt = 0;
@@ -299,7 +299,7 @@ void IOWatcher::Dump() {
 
     // Unix sockets don't like huge messages. TCP sockets do.
     // TODO: handle EMSGSIZE after sendmsg().
-    size_t max_to_write = unix_socket ? 8*KB : 64*KB;
+    size_t max_to_write = unix_socket ? 8*KB : 256*KB;
 
     int fd_to_send = -1;
 
@@ -312,6 +312,7 @@ void IOWatcher::Dump() {
     }
     size_t first_offset = offset;
 
+    DEBUG_PRINT("<%d> offset=%ld", io->watcher_.fd, offset);
 
     // Loop over all the buckets for this particular watcher/socket in order
     // to fill iov.
@@ -367,7 +368,7 @@ void IOWatcher::Dump() {
         Local<Value> fd_v = bucket->Get(fd_sym);
         if (fd_v->IsInt32()) {
           fd_to_send = fd_v->Int32Value();
-          DEBUG_PRINT("got fd to send: %d", fd_to_send);
+          DEBUG_PRINT("<%d> got fd to send: %d", io->watcher_.fd, fd_to_send);
           assert(fd_to_send >= 0);
         }
       }
@@ -406,7 +407,8 @@ void IOWatcher::Dump() {
         written = writev(io->watcher_.fd, iov, iovcnt);
       }
 
-      DEBUG_PRINT("iovcnt: %d, to_write: %ld, written: %ld",
+      DEBUG_PRINT("<%d> iovcnt: %d, to_write: %ld, written: %ld",
+                  io->watcher_.fd,
                   iovcnt,
                   to_write,
                   written);
@@ -415,6 +417,7 @@ void IOWatcher::Dump() {
         // Allow EAGAIN.
         // TODO: handle EMSGSIZE after sendmsg().
         if (errno == EAGAIN) {
+          DEBUG_PRINT("<%d> EAGAIN", io->watcher_.fd);
           io->Start();
         } else {
           // Emit error event
@@ -441,6 +444,7 @@ void IOWatcher::Dump() {
       // what about written == 0 ?
 
       size_t queue_size = watcher->Get(queue_size_sym)->Uint32Value();
+      DEBUG_PRINT("<%d> queue_size=%ld", io->watcher_.fd, queue_size);
       assert(queue_size >= offset);
 
       // Now drop the buckets that have been written.
@@ -475,26 +479,34 @@ void IOWatcher::Dump() {
           bucket->Set(fd_sym, Null());
         }
 
+        DEBUG_PRINT("<%d,%ld> bucket_len: %ld, offset: %ld",
+                    io->watcher_.fd,
+                    bucket_index,
+                    bucket_len,
+                    offset);
         assert(bucket_len > offset);
-        DEBUG_PRINT("[%ld] bucket_len: %ld, offset: %ld", bucket_index, bucket_len, offset);
-
-        queue_size -= written;
 
         // Only on the first bucket does is the offset > 0.
         if (offset + written < bucket_len) {
           // we have not written the entire bucket
-          DEBUG_PRINT("[%ld] Only wrote part of the buffer. "
+          DEBUG_PRINT("<%d,%ld> Only wrote part of the buffer. "
                       "setting watcher.offset = %ld",
+                      io->watcher_.fd,
                       bucket_index,
                       offset + written);
 
           watcher->Set(offset_sym,
-                           Integer::NewFromUnsigned(offset + written));
+                       Integer::NewFromUnsigned(offset + written));
           break;
         } else {
-          DEBUG_PRINT("[%ld] wrote the whole bucket. discarding.",
+          DEBUG_PRINT("<%d,%ld> wrote the whole bucket. discarding.",
+                      io->watcher_.fd,
                       bucket_index);
 
+          assert(bucket_len <= queue_size);
+          queue_size -= bucket_len;
+
+          assert(bucket_len - offset <= written);
           written -= bucket_len - offset;
 
           Local<Value> bucket_callback_v = bucket->Get(callback_sym);
@@ -519,7 +531,6 @@ void IOWatcher::Dump() {
         watcher->Set(first_bucket_sym, bucket->Get(next_sym));
       }
 
-      // Set the queue size.
       watcher->Set(queue_size_sym, Integer::NewFromUnsigned(queue_size));
     }
 
@@ -536,7 +547,7 @@ void IOWatcher::Dump() {
       // Still have buckets to be written. Wait for fd to become writable.
       io->Start();
 
-      DEBUG_PRINT("Started watcher %d", io->watcher_.fd);
+      DEBUG_PRINT("<%d> Started watcher", io->watcher_.fd);
     } else {
       // No more buckets in the queue. Make sure the last_bucket_sym is
       // updated and then go to the next watcher.
@@ -546,7 +557,7 @@ void IOWatcher::Dump() {
       // become writable.
       io->Stop();
 
-      DEBUG_PRINT("Stop watcher %d", io->watcher_.fd);
+      DEBUG_PRINT("<%d> Stop watcher", io->watcher_.fd);
 
       // Emit drain event
       if (watcher->Has(ondrain_sym)) {
index d6e7457c72de61552da004aa88ab8c172305aac0..2e510360d56b516470c19c3e387e26d476929def 100644 (file)
@@ -87,11 +87,13 @@ function test (N, b, cb) {
 
   w.firstBucket = { data: b };
   w.lastBucket = w.firstBucket;
+  w.queueSize = b.length;
 
   for (var i = 0; i < N-1; i++) {
     var bucket = { data: b };
     w.lastBucket.next = bucket;
     w.lastBucket = bucket;
+    w.queueSize += b.length;
     // Kind of randomly fill these buckets with fds.
     if (fdsSent < 5 && i % 2 == 0) {
       bucket.fd = 1; // send stdout
index af9ad870184c5f5347393defbf73dce4dd1d0163..c1597749f37ab4e7659180969687e56e3f1ff3f2 100644 (file)
@@ -49,7 +49,6 @@ function test (N, b, cb) {
   stream.readable = true;
   stream.resume();
 
-
   // Count the data as it arrives on the read end of the pipe.
   stream.on('data', function (d) {
     nread += d.length;
@@ -84,12 +83,14 @@ function test (N, b, cb) {
 
   w.firstBucket = { data: b };
   w.lastBucket = w.firstBucket;
+  w.queueSize = b.length;
 
   for (var i = 0; i < N-1; i++) {
     var bucket = { data: b };
     assert.ok(!w.lastBucket.next);
     w.lastBucket.next = bucket;
     w.lastBucket = bucket;
+    w.queueSize += b.length;
   }
 }
 
index 3341e875ca393400c6e134bf2d0463b4f38beb74..d12c2b163e872b94cd13d3dcf08c23690bd5127d 100644 (file)
@@ -93,7 +93,6 @@ function startClient () {
   req.write(buffer);
   req.end();
 
-
   console.log("request fd=%d", req.connection.fd);
 
   // note the queue includes http headers.