stream_wrap: use new slab allocator
authorBen Noordhuis <info@bnoordhuis.nl>
Thu, 29 Mar 2012 01:04:35 +0000 (03:04 +0200)
committerBen Noordhuis <info@bnoordhuis.nl>
Fri, 30 Mar 2012 19:24:02 +0000 (21:24 +0200)
src/stream_wrap.cc

index e29c317..515fa9f 100644 (file)
@@ -22,6 +22,7 @@
 #include "node.h"
 #include "node_buffer.h"
 #include "handle_wrap.h"
+#include "slab_allocator.h"
 #include "stream_wrap.h"
 #include "pipe_wrap.h"
 #include "tcp_wrap.h"
 
 #include <stdlib.h> // abort()
 
-
-namespace node {
-
-
 #define SLAB_SIZE (1024 * 1024)
-#define MIN(a, b) ((a) < (b) ? (a) : (b))
 
 
+namespace node {
+
 using v8::Object;
 using v8::Handle;
 using v8::Local;
@@ -69,26 +67,20 @@ typedef class ReqWrap<uv_shutdown_t> ShutdownWrap;
 typedef class ReqWrap<uv_write_t> WriteWrap;
 
 
-static size_t slab_used;
-static uv_stream_t* handle_that_last_alloced;
-static Persistent<String> slab_sym;
 static Persistent<String> buffer_sym;
 static Persistent<String> write_queue_size_sym;
+static SlabAllocator slab_allocator(SLAB_SIZE);
 static bool initialized;
 
 
 void StreamWrap::Initialize(Handle<Object> target) {
-  if (initialized) {
-    return;
-  } else {
-    initialized = true;
-  }
+  if (initialized) return;
+  initialized = true;
 
   HandleScope scope;
 
   HandleWrap::Initialize(target);
 
-  slab_sym = Persistent<String>::New(String::NewSymbol("slab"));
   buffer_sym = Persistent<String>::New(String::NewSymbol("buffer"));
   write_queue_size_sym =
     Persistent<String>::New(String::NewSymbol("writeQueueSize"));
@@ -152,60 +144,11 @@ Handle<Value> StreamWrap::ReadStop(const Arguments& args) {
 }
 
 
-char* StreamWrap::NewSlab(Handle<Object> global,
-                          Handle<Object> wrap_obj) {
-  HandleScope scope;
-  Local<Value> arg = Integer::NewFromUnsigned(SLAB_SIZE);
-  Local<Object> b = Buffer::constructor_template->GetFunction()->
-    NewInstance(1, &arg);
-  if (b.IsEmpty()) return NULL;
-  global->SetHiddenValue(slab_sym, b);
-  assert(Buffer::Length(b) == SLAB_SIZE);
-  slab_used = 0;
-  wrap_obj->SetHiddenValue(slab_sym, b);
-  return Buffer::Data(b);
-}
-
-
 uv_buf_t StreamWrap::OnAlloc(uv_handle_t* handle, size_t suggested_size) {
-  HandleScope scope;
-
   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
   assert(wrap->stream_ == reinterpret_cast<uv_stream_t*>(handle));
-
-  char* slab = NULL;
-
-  Handle<Object> global = Context::GetCurrent()->Global();
-  Local<Value> slab_v = global->GetHiddenValue(slab_sym);
-
-  if (slab_v.IsEmpty()) {
-    // No slab currently. Create a new one.
-    slab = NewSlab(global, wrap->object_);
-  } else {
-    // Use existing slab.
-    Local<Object> slab_obj = slab_v->ToObject();
-    slab = Buffer::Data(slab_obj);
-    assert(Buffer::Length(slab_obj) == SLAB_SIZE);
-    assert(SLAB_SIZE >= slab_used);
-
-    // If less than 64kb is remaining on the slab allocate a new one.
-    if (SLAB_SIZE - slab_used < 64 * 1024) {
-      slab = NewSlab(global, wrap->object_);
-    } else {
-      wrap->object_->SetHiddenValue(slab_sym, slab_obj);
-    }
-  }
-
-  uv_buf_t buf;
-  buf.base = slab + slab_used;
-  buf.len = MIN(SLAB_SIZE - slab_used, suggested_size);
-
-  wrap->slab_offset_ = slab_used;
-  slab_used += buf.len;
-
-  handle_that_last_alloced = reinterpret_cast<uv_stream_t*>(handle);
-
-  return buf;
+  char* buf = slab_allocator.Allocate(wrap->object_, suggested_size);
+  return uv_buf_init(buf, suggested_size);
 }
 
 
@@ -219,34 +162,24 @@ void StreamWrap::OnReadCommon(uv_stream_t* handle, ssize_t nread,
   // uv_close() on the handle.
   assert(wrap->object_.IsEmpty() == false);
 
-  // Remove the reference to the slab to avoid memory leaks;
-  Local<Value> slab_v = wrap->object_->GetHiddenValue(slab_sym);
-  wrap->object_->SetHiddenValue(slab_sym, v8::Null());
+  Local<Object> slab = slab_allocator.Shrink(wrap->object_,
+                                             buf.base,
+                                             nread < 0 ? 0 : nread);
 
   if (nread < 0)  {
-    // EOF or Error
-    if (handle_that_last_alloced == handle) {
-      slab_used -= buf.len;
-    }
-
     SetErrno(uv_last_error(uv_default_loop()));
     MakeCallback(wrap->object_, "onread", 0, NULL);
     return;
   }
 
-  assert(static_cast<size_t>(nread) <= buf.len);
-
-  if (handle_that_last_alloced == handle) {
-    slab_used -= (buf.len - nread);
-  }
-
   if (nread == 0) return;
+  assert(static_cast<size_t>(nread) <= buf.len);
 
   int argc = 3;
   Local<Value> argv[4] = {
-    slab_v,
-    Integer::New(wrap->slab_offset_),
-    Integer::New(nread)
+    slab,
+    Integer::NewFromUnsigned(buf.base - Buffer::Data(slab)),
+    Integer::NewFromUnsigned(nread)
   };
 
   Local<Object> pending_obj;