Reference counting. Network bugs.
authorRyan <ry@tinyclouds.org>
Tue, 5 May 2009 16:11:04 +0000 (18:11 +0200)
committerRyan <ry@tinyclouds.org>
Tue, 5 May 2009 16:15:59 +0000 (18:15 +0200)
Connections were being garbage collected while they were still in progress
since the object would leave scope. This commit adds ObjectWrap::Attach()
and ObjectWrap::Detach() to tell v8 that an object is currently on the event
loop and will be needed in the future.

Other changes to oi_socket.c and net.cc are to fix bugs encountered while
running the HTTP server.

deps/liboi/oi_socket.c
src/file.cc
src/net.cc
src/net.h
src/node.cc
src/node.h
src/timer.cc

index 47643f9..df6366d 100644 (file)
 
 #if HAVE_GNUTLS
 # include <gnutls/gnutls.h>
-#endif // HAVE_GNUTLS
 
 /* a few forwards 
  * they wont even be defined if not having gnutls
  * */
 static int secure_full_goodbye (oi_socket *socket);
 static int secure_half_goodbye (oi_socket *socket);
+#endif // HAVE_GNUTLS
 
 #undef TRUE
 #define TRUE 1
@@ -99,23 +99,21 @@ oi_buf_new (const char *base, size_t len)
   return buf;
 }
 
-#define CLOSE_ASAP(socket) do {           \
-  if ((socket)->read_action) {            \
-    (socket)->read_action = full_close;   \
-  }                                       \
-  if ((socket)->write_action) {           \
-    (socket)->write_action = full_close;  \
-  }                                       \
+#define CLOSE_ASAP(socket) do {         \
+  (socket)->read_action = full_close;   \
+  (socket)->write_action = full_close;  \
 } while (0)
 
 static int 
 full_close(oi_socket *socket)
 {
+  //printf("close(%d)\n", socket->fd);
   if (close(socket->fd) == -1)
     return errno == EINTR ? AGAIN : ERROR;
 
   socket->read_action = NULL;
   socket->write_action = NULL;
+  socket->fd = -1;
   return OKAY;
 }
 
@@ -124,9 +122,17 @@ half_close(oi_socket *socket)
 {
   int r = shutdown(socket->fd, SHUT_WR);
   if (r == -1) {
-    socket->errorno = errno;
-    assert(0 && "Shouldn't get an error on shutdown");
-    return ERROR;
+    switch (errno) {
+      case ENOTCONN:
+        socket->errorno = errno;
+        return ERROR;
+
+      default:
+        perror("shutdown()");
+        socket->errorno = errno;
+        assert(0 && "Shouldn't get an error on shutdown");
+        return ERROR;
+    }
   }
   socket->write_action = NULL;
   return OKAY;
@@ -141,6 +147,9 @@ change_state_for_empty_out_stream (oi_socket *socket)
    * a very complicated bunch of close logic!
    * XXX this is awful. FIXME
    */
+  if (socket->write_action == full_close || socket->read_action == full_close)
+    return;
+
   if (socket->got_half_close == FALSE) {
     if (socket->got_full_close == FALSE) {
       /* Normal situation. Didn't get any close signals. */
@@ -244,10 +253,10 @@ secure_handshake(oi_socket *socket)
     if (socket->on_connect) socket->on_connect(socket);
   }
 
-  if (socket->read_action)
+  if (socket->read_action == secure_handshake)
     socket->read_action = secure_socket_recv;
  
-  if (socket->write_action)
+  if (socket->write_action == secure_handshake)
     socket->write_action = secure_socket_send;
 
   return OKAY;
@@ -340,8 +349,11 @@ secure_socket_recv(oi_socket *socket)
 
   if (recved >= 0) {
     /* Got EOF */
-    if (recved == 0)
+    if (recved == 0) {
       socket->read_action = NULL;
+      if (socket->write_action == NULL) 
+        CLOSE_ASAP(socket);
+    }
 
     if (socket->write_action) 
       socket->write_action = secure_socket_send;
@@ -454,6 +466,7 @@ socket_send (oi_socket *socket)
 
       default:
         perror("send()");
+        printf("%p had send error\n", socket);
         assert(0 && "oi shouldn't let this happen.");
         socket->errorno = errno;
         return ERROR;
@@ -509,7 +522,7 @@ socket_recv (oi_socket *socket)
 
       default:
         perror("recv()");
-        printf("unmatched errno %d\n", errno);
+        printf("unmatched errno %d %s\n\n", errno, strerror(errno));
         assert(0 && "recv returned error that oi should have caught before.");
         return ERROR;
     }
@@ -520,6 +533,8 @@ socket_recv (oi_socket *socket)
   if (recved == 0) {
     oi_socket_read_stop(socket);
     socket->read_action = NULL;
+    if (socket->write_action == NULL)
+      CLOSE_ASAP(socket);
   }
 
   /* NOTE: EOF is signaled with recved == 0 on callback */
@@ -860,8 +875,19 @@ oi_socket_full_close (oi_socket *socket)
 
 void oi_socket_force_close (oi_socket *socket)
 {
+  release_write_buffer(socket);
+
+  ev_clear_pending (EV_A_ &socket->write_watcher);
+  ev_clear_pending (EV_A_ &socket->read_watcher);
+  ev_clear_pending (EV_A_ &socket->timeout_watcher);
+
+  socket->write_action = socket->read_action = NULL;
   // socket->errorno = OI_SOCKET_ERROR_FORCE_CLOSE
-  CLOSE_ASAP(socket);
+  close(socket->fd);
+
+  socket->fd = -1;
+
+  oi_socket_detach(socket);
 }
 
 void 
index d43cde6..356fca7 100644 (file)
@@ -105,7 +105,7 @@ FileSystem::Rename (const Arguments& args)
   String::Utf8Value new_path(args[1]->ToString());
 
   node::eio_warmup();
-  eio_req *req = eio_rename(*path, *new_path, EIO_PRI_DEFAULT, AfterRename, NULL);
+  eio_rename(*path, *new_path, EIO_PRI_DEFAULT, AfterRename, NULL);
 
   return Undefined();
 }
@@ -132,7 +132,7 @@ FileSystem::Stat (const Arguments& args)
   String::Utf8Value path(args[0]->ToString());
 
   node::eio_warmup();
-  eio_req *req = eio_stat(*path, EIO_PRI_DEFAULT, AfterStat, NULL);
+  eio_stat(*path, EIO_PRI_DEFAULT, AfterStat, NULL);
 
   return Undefined();
 }
@@ -225,6 +225,7 @@ File::GetFD (void)
 {
   Handle<Value> fd_value = handle_->Get(FD_SYMBOL);
   int fd = fd_value->IntegerValue();
+  return fd;
 }
 
 Handle<Value>
@@ -237,8 +238,8 @@ File::Close (const Arguments& args)
   int fd = file->GetFD();
 
   node::eio_warmup();
-  eio_req *req = eio_close (fd, EIO_PRI_DEFAULT, File::AfterClose, file);
-
+  eio_close (fd, EIO_PRI_DEFAULT, File::AfterClose, file);
+  file->Attach();
   return Undefined();
 }
 
@@ -255,7 +256,7 @@ File::AfterClose (eio_req *req)
   Local<Value> argv[argc];
   argv[0] = Integer::New(req->errorno);
   CallTopCallback(file->handle_, argc, argv);
-
+  file->Detach();
   return 0;
 }
 
@@ -298,8 +299,8 @@ File::Open (const Arguments& args)
 
   // TODO how should the mode be set?
   node::eio_warmup();
-  eio_req *req = eio_open (*path, flags, 0666, EIO_PRI_DEFAULT, File::AfterOpen, file);
-
+  eio_open (*path, flags, 0666, EIO_PRI_DEFAULT, File::AfterOpen, file);
+  file->Attach();
   return Undefined();
 }
 
@@ -318,6 +319,7 @@ File::AfterOpen (eio_req *req)
   argv[0] = Integer::New(req->errorno);
   CallTopCallback(file->handle_, argc, argv);
 
+  file->Detach();
   return 0;
 }
 
@@ -346,7 +348,7 @@ File::Write (const Arguments& args)
     Local<Array> array = Local<Array>::Cast(args[0]);
     length = array->Length();
     buf = static_cast<char*>(malloc(length));
-    for (int i = 0; i < length; i++) {
+    for (unsigned int i = 0; i < length; i++) {
       Local<Value> int_value = array->Get(Integer::New(i));
       buf[i] = int_value->Int32Value();
     }
@@ -366,8 +368,9 @@ File::Write (const Arguments& args)
   int fd = file->GetFD();
 
   node::eio_warmup();
-  eio_req *req = eio_write(fd, buf, length, pos, EIO_PRI_DEFAULT, File::AfterWrite, file);
+  eio_write(fd, buf, length, pos, EIO_PRI_DEFAULT, File::AfterWrite, file);
 
+  file->Attach();
   return Undefined();
 }
 
@@ -378,7 +381,7 @@ File::AfterWrite (eio_req *req)
 
   //char *buf = static_cast<char*>(req->ptr2);
   free(req->ptr2);
-  size_t written = req->result;
+  ssize_t written = req->result;
 
   HandleScope scope;
 
@@ -388,6 +391,7 @@ File::AfterWrite (eio_req *req)
   argv[1] = written >= 0 ? Integer::New(written) : Integer::New(0);
   CallTopCallback(file->handle_, argc, argv);
 
+  file->Detach();
   return 0;
 }
 
@@ -407,9 +411,9 @@ File::Read (const Arguments& args)
 
   // NOTE: NULL pointer tells eio to allocate it itself
   node::eio_warmup();
-  eio_req *req = eio_read(fd, NULL, length, pos, EIO_PRI_DEFAULT, File::AfterRead, file);
-  assert(req);
+  eio_read(fd, NULL, length, pos, EIO_PRI_DEFAULT, File::AfterRead, file);
 
+  file->Attach();
   return Undefined();
 }
 
@@ -436,13 +440,15 @@ File::AfterRead (eio_req *req)
     } else {
       // raw encoding
       Local<Array> array = Array::New(length);
-      for (int i = 0; i < length; i++) {
+      for (unsigned int i = 0; i < length; i++) {
         array->Set(Integer::New(i), Integer::New(buf[i]));
       }
       argv[1] = array;
     }
   }
   CallTopCallback(file->handle_, argc, argv);
+
+  file->Detach();
   return 0;
 }
 
index 20119b0..dfdf45d 100644 (file)
@@ -105,8 +105,13 @@ Connection::Connection (Handle<Object> handle, Handle<Function> protocol_class)
 
 Connection::~Connection ()
 {
-  handle_->Delete(SEND_SYMBOL);
-  Close();
+  static int i = 0;
+  if(socket_.fd > 0) {
+    printf("garbage collecting open Connection : %d\n", i++);
+    printf("  socket->read_action: %p\n", socket_.read_action);
+    printf("  socket->write_action: %p\n", socket_.write_action);
+  }
+  ForceClose();
 }
 
 Local<Object>
@@ -128,6 +133,8 @@ Connection::SetAcceptor (Handle<Object> acceptor_handle)
 {
   HandleScope scope;
   handle_->Set(SERVER_SYMBOL, acceptor_handle);
+  
+  Attach();
 }
 
 Handle<Value>
@@ -172,6 +179,9 @@ Connection::v8Connect (const Arguments& args)
             , Connection::AfterResolve
             , connection
             );
+
+  connection->Attach();
+
   return Undefined();
 }
 
@@ -214,6 +224,7 @@ Connection::AfterResolve (eio_req *req)
   }
 
   connection->OnDisconnect();
+  connection->Detach();
 
   return 0;
 }
@@ -242,6 +253,7 @@ Connection::v8ForceClose (const Arguments& args)
   HandleScope scope;
   Connection *connection = NODE_UNWRAP(Connection, args.Holder());
   connection->ForceClose();
+  //connection->Detach();
   return Undefined();
 }
 
index e4b03a5..cf6fc7d 100644 (file)
--- a/src/net.h
+++ b/src/net.h
@@ -69,6 +69,7 @@ private:
   static void on_close (oi_socket *s) {
     Connection *connection = static_cast<Connection*> (s->data);
     connection->OnDisconnect();
+    connection->Detach();
   }
 
   static void on_timeout (oi_socket *s) {
@@ -98,7 +99,7 @@ protected:
   Acceptor (v8::Handle<v8::Object> handle, 
             v8::Handle<v8::Function> protocol_class, 
             v8::Handle<v8::Object> options);
-  virtual ~Acceptor () { Close(); }
+  virtual ~Acceptor () { Close(); puts("acceptor gc'd!");}
 
   v8::Local<v8::Function> GetProtocolClass (void);
 
@@ -106,11 +107,13 @@ protected:
     int r = oi_server_listen (&server_, address); 
     if(r != 0) return r;
     oi_server_attach (EV_DEFAULT_ &server_); 
+    Attach();
     return 0;
   }
 
   void Close ( ) { 
     oi_server_close (&server_); 
+    Detach();
   }
 
   virtual Connection* OnConnection (struct sockaddr *addr, socklen_t len);
index 4b8863e..404b2aa 100644 (file)
@@ -22,11 +22,9 @@ static int exit_code = 0;
 
 ObjectWrap::~ObjectWrap ( )
 {
-  if (!handle_.IsEmpty())  {
-    handle_->SetInternalField(0, Undefined());
-    handle_.Dispose();
-    handle_.Clear(); 
-  }
+  handle_->SetInternalField(0, Undefined());
+  handle_.Dispose();
+  handle_.Clear(); 
 }
 
 ObjectWrap::ObjectWrap (Handle<Object> handle)
@@ -38,6 +36,25 @@ ObjectWrap::ObjectWrap (Handle<Object> handle)
   Handle<External> external = External::New(this);
   handle_->SetInternalField(0, external);
   handle_.MakeWeak(this, ObjectWrap::MakeWeak);
+
+  attach_count_ = 0;
+  weak_ = false;
+}
+
+void
+ObjectWrap::Attach ()
+{
+  attach_count_ += 1;
+}
+
+void
+ObjectWrap::Detach ()
+{
+  attach_count_ -= 1;
+  assert(attach_count_ >= 0);
+
+  if(weak_ && attach_count_ == 0)
+    delete this;
 }
 
 void*
@@ -52,8 +69,10 @@ ObjectWrap::Unwrap (v8::Handle<v8::Object> handle)
 void
 ObjectWrap::MakeWeak (Persistent<Value> _, void *data)
 {
-  ObjectWrap *w = static_cast<ObjectWrap*> (data);
-  delete w;
+  ObjectWrap *obj = static_cast<ObjectWrap*> (data);
+  obj->weak_ = true;
+  if (obj->attach_count_ == 0)
+    delete obj;
 }
 
 // Extracts a C string from a V8 Utf8Value.
index d7e79e9..cea6886 100644 (file)
@@ -26,8 +26,24 @@ protected:
   static void* Unwrap (v8::Handle<v8::Object> handle);
   v8::Persistent<v8::Object> handle_;
 
+  /* Attach() marks the object as being attached to an event loop.
+   * Attached objects will not be garbage collected, even if
+   * all references are lost.
+   */
+  void Attach();
+  /* Detach() marks an object as detached from the event loop.  This is its
+   * default state.  When an object with a "weak" reference changes from
+   * attached to detached state it will be freed. Be careful not to access
+   * the object after making this call as it might be gone!
+   * (A "weak reference" is v8 terminology for an object who only has a
+   * persistant handle.)
+   */
+  void Detach();
+
 private:
   static void MakeWeak (v8::Persistent<v8::Value> _, void *data);
+  int attach_count_;
+  bool weak_;
 };
 
 } // namespace node
index 1e6f0bb..21553f1 100644 (file)
@@ -3,8 +3,9 @@
 #include <assert.h>
 
 using namespace v8;
+using namespace node;
 
-class Timer : node::ObjectWrap {
+class Timer : ObjectWrap {
  public:
   Timer(Handle<Object> handle, Handle<Function> callback, ev_tstamp after, ev_tstamp repeat);
   ~Timer();
@@ -34,7 +35,14 @@ Timer::OnTimeout (EV_P_ ev_timer *watcher, int revents)
   TryCatch try_catch;
   callback->Call (timer->handle_, 0, NULL);
   if (try_catch.HasCaught())
-    node::fatal_exception(try_catch);
+    fatal_exception(try_catch);
+
+  /* XXX i'm a bit worried if this is the correct test? 
+   * it's rather crutial for memory leaks the conditional here test to see
+   * if the watcher will make another callback.
+   */ 
+  if (false == ev_is_active(&timer->watcher_))
+    timer->Detach();
 }
 
 Timer::Timer (Handle<Object> handle, Handle<Function> callback, ev_tstamp after, ev_tstamp repeat)
@@ -48,6 +56,7 @@ Timer::Timer (Handle<Object> handle, Handle<Function> callback, ev_tstamp after,
   watcher_.data = this;
 
   ev_timer_start(EV_DEFAULT_UC_ &watcher_);
+  Attach();
 }
 
 Timer::~Timer ()
@@ -78,6 +87,7 @@ Timer::Start (const Arguments& args)
 {
   Timer *timer = NODE_UNWRAP(Timer, args.Holder());
   ev_timer_start(EV_DEFAULT_UC_ &timer->watcher_);
+  timer->Attach();
   return Undefined();
 }
 
@@ -86,6 +96,7 @@ Timer::Stop (const Arguments& args)
 {
   Timer *timer = NODE_UNWRAP(Timer, args.Holder());
   ev_timer_stop(EV_DEFAULT_UC_ &timer->watcher_);
+  timer->Detach();
   return Undefined();
 }