src: deduplicate CHECK_EQ/CHECK_NE macros
[platform/upstream/nodejs.git] / src / stream_wrap.cc
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 #include "stream_wrap.h"
23 #include "env-inl.h"
24 #include "env.h"
25 #include "handle_wrap.h"
26 #include "node_buffer.h"
27 #include "node_counters.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32 #include "util.h"
33 #include "util-inl.h"
34
35 #include <stdlib.h>  // abort()
36 #include <string.h>  // memcpy()
37 #include <limits.h>  // INT_MAX
38
39
40 namespace node {
41
42 using v8::Array;
43 using v8::Context;
44 using v8::EscapableHandleScope;
45 using v8::FunctionCallbackInfo;
46 using v8::Handle;
47 using v8::HandleScope;
48 using v8::Integer;
49 using v8::Local;
50 using v8::Number;
51 using v8::Object;
52 using v8::PropertyCallbackInfo;
53 using v8::String;
54 using v8::True;
55 using v8::Undefined;
56 using v8::Value;
57
58
59 StreamWrap::StreamWrap(Environment* env,
60                        Local<Object> object,
61                        uv_stream_t* stream,
62                        AsyncWrap::ProviderType provider)
63     : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream), provider),
64       stream_(stream),
65       default_callbacks_(this),
66       callbacks_(&default_callbacks_) {
67 }
68
69
70 void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
71 #if !defined(_WIN32)
72   Environment* env = Environment::GetCurrent(args.GetIsolate());
73   HandleScope scope(env->isolate());
74   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
75   int fd = -1;
76   if (wrap != NULL && wrap->stream() != NULL) {
77     fd = wrap->stream()->io_watcher.fd;
78   }
79   args.GetReturnValue().Set(fd);
80 #endif
81 }
82
83
84 void StreamWrap::UpdateWriteQueueSize() {
85   HandleScope scope(env()->isolate());
86   Local<Integer> write_queue_size =
87       Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
88   object()->Set(env()->write_queue_size_string(), write_queue_size);
89 }
90
91 void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
92   Environment* env = Environment::GetCurrent(args.GetIsolate());
93   HandleScope scope(env->isolate());
94
95   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
96
97   int err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
98
99   args.GetReturnValue().Set(err);
100 }
101
102
103 void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
104   Environment* env = Environment::GetCurrent(args.GetIsolate());
105   HandleScope scope(env->isolate());
106
107   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
108
109   int err = uv_read_stop(wrap->stream());
110   args.GetReturnValue().Set(err);
111 }
112
113
114 void StreamWrap::OnAlloc(uv_handle_t* handle,
115                          size_t suggested_size,
116                          uv_buf_t* buf) {
117   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
118   assert(wrap->stream() == reinterpret_cast<uv_stream_t*>(handle));
119   wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
120 }
121
122
123 template <class WrapType, class UVType>
124 static Local<Object> AcceptHandle(Environment* env, uv_stream_t* pipe) {
125   EscapableHandleScope scope(env->isolate());
126   Local<Object> wrap_obj;
127   UVType* handle;
128
129   wrap_obj = WrapType::Instantiate(env);
130   if (wrap_obj.IsEmpty())
131     return Local<Object>();
132
133   WrapType* wrap = Unwrap<WrapType>(wrap_obj);
134   handle = wrap->UVHandle();
135
136   if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
137     abort();
138
139   return scope.Escape(wrap_obj);
140 }
141
142
143 void StreamWrap::OnReadCommon(uv_stream_t* handle,
144                               ssize_t nread,
145                               const uv_buf_t* buf,
146                               uv_handle_type pending) {
147   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
148
149   // We should not be getting this callback if someone as already called
150   // uv_close() on the handle.
151   assert(wrap->persistent().IsEmpty() == false);
152
153   if (nread > 0) {
154     if (wrap->is_tcp()) {
155       NODE_COUNT_NET_BYTES_RECV(nread);
156     } else if (wrap->is_named_pipe()) {
157       NODE_COUNT_PIPE_BYTES_RECV(nread);
158     }
159   }
160
161   wrap->callbacks()->DoRead(handle, nread, buf, pending);
162 }
163
164
165 void StreamWrap::OnRead(uv_stream_t* handle,
166                         ssize_t nread,
167                         const uv_buf_t* buf) {
168   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
169   uv_handle_type type = UV_UNKNOWN_HANDLE;
170
171   if (wrap->is_named_pipe_ipc() &&
172       uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
173     type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
174   }
175
176   OnReadCommon(handle, nread, buf, type);
177 }
178
179
180 size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
181   assert(Buffer::HasInstance(val));
182
183   // Simple non-writev case
184   buf->base = Buffer::Data(val);
185   buf->len = Buffer::Length(val);
186
187   return buf->len;
188 }
189
190
191 void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
192   HandleScope handle_scope(args.GetIsolate());
193   Environment* env = Environment::GetCurrent(args.GetIsolate());
194
195   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
196
197   assert(args[0]->IsObject());
198   assert(Buffer::HasInstance(args[1]));
199
200   Local<Object> req_wrap_obj = args[0].As<Object>();
201   Local<Object> buf_obj = args[1].As<Object>();
202
203   size_t length = Buffer::Length(buf_obj);
204
205   char* storage;
206   WriteWrap* req_wrap;
207   uv_buf_t buf;
208   WriteBuffer(buf_obj, &buf);
209
210   // Try writing immediately without allocation
211   uv_buf_t* bufs = &buf;
212   size_t count = 1;
213   int err = wrap->callbacks()->TryWrite(&bufs, &count);
214   if (err != 0)
215     goto done;
216   if (count == 0)
217     goto done;
218   assert(count == 1);
219
220   // Allocate, or write rest
221   storage = new char[sizeof(WriteWrap)];
222   req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
223
224   err = wrap->callbacks()->DoWrite(req_wrap,
225                                    bufs,
226                                    count,
227                                    NULL,
228                                    StreamWrap::AfterWrite);
229   req_wrap->Dispatched();
230   req_wrap_obj->Set(env->async(), True(env->isolate()));
231
232   if (err) {
233     req_wrap->~WriteWrap();
234     delete[] storage;
235   }
236
237  done:
238   const char* msg = wrap->callbacks()->Error();
239   if (msg != NULL)
240     req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
241   req_wrap_obj->Set(env->bytes_string(),
242                     Integer::NewFromUnsigned(env->isolate(), length));
243   args.GetReturnValue().Set(err);
244 }
245
246
247 template <enum encoding encoding>
248 void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
249   HandleScope handle_scope(args.GetIsolate());
250   Environment* env = Environment::GetCurrent(args.GetIsolate());
251   int err;
252
253   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
254
255   assert(args[0]->IsObject());
256   assert(args[1]->IsString());
257
258   Local<Object> req_wrap_obj = args[0].As<Object>();
259   Local<String> string = args[1].As<String>();
260
261   // Compute the size of the storage that the string will be flattened into.
262   // For UTF8 strings that are very long, go ahead and take the hit for
263   // computing their actual size, rather than tripling the storage.
264   size_t storage_size;
265   if (encoding == UTF8 && string->Length() > 65535)
266     storage_size = StringBytes::Size(env->isolate(), string, encoding);
267   else
268     storage_size = StringBytes::StorageSize(env->isolate(), string, encoding);
269
270   if (storage_size > INT_MAX) {
271     args.GetReturnValue().Set(UV_ENOBUFS);
272     return;
273   }
274
275   // Try writing immediately if write size isn't too big
276   char* storage;
277   WriteWrap* req_wrap;
278   char* data;
279   char stack_storage[16384];  // 16kb
280   size_t data_size;
281   uv_buf_t buf;
282
283   bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
284                    (!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
285   if (try_write) {
286     data_size = StringBytes::Write(env->isolate(),
287                                    stack_storage,
288                                    storage_size,
289                                    string,
290                                    encoding);
291     buf = uv_buf_init(stack_storage, data_size);
292
293     uv_buf_t* bufs = &buf;
294     size_t count = 1;
295     err = wrap->callbacks()->TryWrite(&bufs, &count);
296
297     // Failure
298     if (err != 0)
299       goto done;
300
301     // Success
302     if (count == 0)
303       goto done;
304
305     // Partial write
306     assert(count == 1);
307   }
308
309   storage = new char[sizeof(WriteWrap) + storage_size + 15];
310   req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
311
312   data = reinterpret_cast<char*>(ROUND_UP(
313       reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
314
315   if (try_write) {
316     // Copy partial data
317     memcpy(data, buf.base, buf.len);
318     data_size = buf.len;
319   } else {
320     // Write it
321     data_size = StringBytes::Write(env->isolate(),
322                                    data,
323                                    storage_size,
324                                    string,
325                                    encoding);
326   }
327
328   assert(data_size <= storage_size);
329
330   buf = uv_buf_init(data, data_size);
331
332   if (!wrap->is_named_pipe_ipc()) {
333     err = wrap->callbacks()->DoWrite(req_wrap,
334                                      &buf,
335                                      1,
336                                      NULL,
337                                      StreamWrap::AfterWrite);
338   } else {
339     uv_handle_t* send_handle = NULL;
340
341     if (args[2]->IsObject()) {
342       Local<Object> send_handle_obj = args[2].As<Object>();
343       HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
344       send_handle = wrap->GetHandle();
345       // Reference StreamWrap instance to prevent it from being garbage
346       // collected before `AfterWrite` is called.
347       assert(!req_wrap->persistent().IsEmpty());
348       req_wrap->object()->Set(env->handle_string(), send_handle_obj);
349     }
350
351     err = wrap->callbacks()->DoWrite(
352         req_wrap,
353         &buf,
354         1,
355         reinterpret_cast<uv_stream_t*>(send_handle),
356         StreamWrap::AfterWrite);
357   }
358
359   req_wrap->Dispatched();
360   req_wrap->object()->Set(env->async(), True(env->isolate()));
361
362   if (err) {
363     req_wrap->~WriteWrap();
364     delete[] storage;
365   }
366
367  done:
368   const char* msg = wrap->callbacks()->Error();
369   if (msg != NULL)
370     req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
371   req_wrap_obj->Set(env->bytes_string(),
372                     Integer::NewFromUnsigned(env->isolate(), data_size));
373   args.GetReturnValue().Set(err);
374 }
375
376
377 void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
378   HandleScope handle_scope(args.GetIsolate());
379   Environment* env = Environment::GetCurrent(args.GetIsolate());
380
381   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
382
383   assert(args[0]->IsObject());
384   assert(args[1]->IsArray());
385
386   Local<Object> req_wrap_obj = args[0].As<Object>();
387   Local<Array> chunks = args[1].As<Array>();
388   size_t count = chunks->Length() >> 1;
389
390   uv_buf_t bufs_[16];
391   uv_buf_t* bufs = bufs_;
392
393   // Determine storage size first
394   size_t storage_size = 0;
395   for (size_t i = 0; i < count; i++) {
396     Handle<Value> chunk = chunks->Get(i * 2);
397
398     if (Buffer::HasInstance(chunk))
399       continue;
400       // Buffer chunk, no additional storage required
401
402     // String chunk
403     Handle<String> string = chunk->ToString();
404     enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
405     size_t chunk_size;
406     if (encoding == UTF8 && string->Length() > 65535)
407       chunk_size = StringBytes::Size(env->isolate(), string, encoding);
408     else
409       chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
410
411     storage_size += chunk_size + 15;
412   }
413
414   if (storage_size > INT_MAX) {
415     args.GetReturnValue().Set(UV_ENOBUFS);
416     return;
417   }
418
419   if (ARRAY_SIZE(bufs_) < count)
420     bufs = new uv_buf_t[count];
421
422   storage_size += sizeof(WriteWrap);
423   char* storage = new char[storage_size];
424   WriteWrap* req_wrap =
425       new(storage) WriteWrap(env, req_wrap_obj, wrap);
426
427   uint32_t bytes = 0;
428   size_t offset = sizeof(WriteWrap);
429   for (size_t i = 0; i < count; i++) {
430     Handle<Value> chunk = chunks->Get(i * 2);
431
432     // Write buffer
433     if (Buffer::HasInstance(chunk)) {
434       bufs[i].base = Buffer::Data(chunk);
435       bufs[i].len = Buffer::Length(chunk);
436       bytes += bufs[i].len;
437       continue;
438     }
439
440     // Write string
441     offset = ROUND_UP(offset, 16);
442     assert(offset < storage_size);
443     char* str_storage = storage + offset;
444     size_t str_size = storage_size - offset;
445
446     Handle<String> string = chunk->ToString();
447     enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
448     str_size = StringBytes::Write(env->isolate(),
449                                   str_storage,
450                                   str_size,
451                                   string,
452                                   encoding);
453     bufs[i].base = str_storage;
454     bufs[i].len = str_size;
455     offset += str_size;
456     bytes += str_size;
457   }
458
459   int err = wrap->callbacks()->DoWrite(req_wrap,
460                                        bufs,
461                                        count,
462                                        NULL,
463                                        StreamWrap::AfterWrite);
464
465   // Deallocate space
466   if (bufs != bufs_)
467     delete[] bufs;
468
469   req_wrap->Dispatched();
470   req_wrap->object()->Set(env->async(), True(env->isolate()));
471   req_wrap->object()->Set(env->bytes_string(),
472                           Number::New(env->isolate(), bytes));
473   const char* msg = wrap->callbacks()->Error();
474   if (msg != NULL)
475     req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
476
477   if (err) {
478     req_wrap->~WriteWrap();
479     delete[] storage;
480   }
481
482   args.GetReturnValue().Set(err);
483 }
484
485
486 void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
487   WriteStringImpl<ASCII>(args);
488 }
489
490
491 void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
492   WriteStringImpl<UTF8>(args);
493 }
494
495
496 void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
497   WriteStringImpl<UCS2>(args);
498 }
499
500 void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
501   Environment* env = Environment::GetCurrent(args.GetIsolate());
502   HandleScope scope(env->isolate());
503
504   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
505
506   assert(args.Length() > 0);
507   int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue());
508
509   args.GetReturnValue().Set(err);
510 }
511
512 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
513   WriteWrap* req_wrap = CONTAINER_OF(req, WriteWrap, req_);
514   StreamWrap* wrap = req_wrap->wrap();
515   Environment* env = wrap->env();
516
517   HandleScope handle_scope(env->isolate());
518   Context::Scope context_scope(env->context());
519
520   // The wrap and request objects should still be there.
521   assert(req_wrap->persistent().IsEmpty() == false);
522   assert(wrap->persistent().IsEmpty() == false);
523
524   // Unref handle property
525   Local<Object> req_wrap_obj = req_wrap->object();
526   req_wrap_obj->Delete(env->handle_string());
527   wrap->callbacks()->AfterWrite(req_wrap);
528
529   Local<Value> argv[] = {
530     Integer::New(env->isolate(), status),
531     wrap->object(),
532     req_wrap_obj,
533     Undefined(env->isolate())
534   };
535
536   const char* msg = wrap->callbacks()->Error();
537   if (msg != NULL)
538     argv[3] = OneByteString(env->isolate(), msg);
539
540   req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
541
542   req_wrap->~WriteWrap();
543   delete[] reinterpret_cast<char*>(req_wrap);
544 }
545
546
547 void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
548   HandleScope handle_scope(args.GetIsolate());
549   Environment* env = Environment::GetCurrent(args.GetIsolate());
550
551   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
552
553   assert(args[0]->IsObject());
554   Local<Object> req_wrap_obj = args[0].As<Object>();
555
556   ShutdownWrap* req_wrap = new ShutdownWrap(env,
557                                             req_wrap_obj,
558                                             AsyncWrap::PROVIDER_SHUTDOWNWRAP);
559   int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
560   req_wrap->Dispatched();
561   if (err)
562     delete req_wrap;
563   args.GetReturnValue().Set(err);
564 }
565
566
567 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
568   ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
569   StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
570   Environment* env = wrap->env();
571
572   // The wrap and request objects should still be there.
573   assert(req_wrap->persistent().IsEmpty() == false);
574   assert(wrap->persistent().IsEmpty() == false);
575
576   HandleScope handle_scope(env->isolate());
577   Context::Scope context_scope(env->context());
578
579   Local<Object> req_wrap_obj = req_wrap->object();
580   Local<Value> argv[3] = {
581     Integer::New(env->isolate(), status),
582     wrap->object(),
583     req_wrap_obj
584   };
585
586   req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
587
588   delete req_wrap;
589 }
590
591
592 const char* StreamWrapCallbacks::Error() {
593   return NULL;
594 }
595
596
597 // NOTE: Call to this function could change both `buf`'s and `count`'s
598 // values, shifting their base and decrementing their length. This is
599 // required in order to skip the data that was successfully written via
600 // uv_try_write().
601 int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
602   int err;
603   size_t written;
604   uv_buf_t* vbufs = *bufs;
605   size_t vcount = *count;
606
607   err = uv_try_write(wrap()->stream(), vbufs, vcount);
608   if (err == UV_ENOSYS)
609     return 0;
610   if (err < 0)
611     return err;
612
613   // Slice off the buffers: skip all written buffers and slice the one that
614   // was partially written.
615   written = err;
616   for (; written != 0 && vcount > 0; vbufs++, vcount--) {
617     // Slice
618     if (vbufs[0].len > written) {
619       vbufs[0].base += written;
620       vbufs[0].len -= written;
621       written = 0;
622       break;
623
624     // Discard
625     } else {
626       written -= vbufs[0].len;
627     }
628   }
629
630   *bufs = vbufs;
631   *count = vcount;
632
633   return 0;
634 }
635
636
637 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
638                                  uv_buf_t* bufs,
639                                  size_t count,
640                                  uv_stream_t* send_handle,
641                                  uv_write_cb cb) {
642   int r;
643   if (send_handle == NULL) {
644     r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
645   } else {
646     r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
647   }
648
649   if (!r) {
650     size_t bytes = 0;
651     for (size_t i = 0; i < count; i++)
652       bytes += bufs[i].len;
653     if (wrap()->stream()->type == UV_TCP) {
654       NODE_COUNT_NET_BYTES_SENT(bytes);
655     } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
656       NODE_COUNT_PIPE_BYTES_SENT(bytes);
657     }
658   }
659
660   wrap()->UpdateWriteQueueSize();
661
662   return r;
663 }
664
665
666 void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
667   wrap()->UpdateWriteQueueSize();
668 }
669
670
671 void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
672                                   size_t suggested_size,
673                                   uv_buf_t* buf) {
674   buf->base = static_cast<char*>(malloc(suggested_size));
675   buf->len = suggested_size;
676
677   if (buf->base == NULL && suggested_size > 0) {
678     FatalError(
679         "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
680         "Out Of Memory");
681   }
682 }
683
684
685 void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
686                                  ssize_t nread,
687                                  const uv_buf_t* buf,
688                                  uv_handle_type pending) {
689   Environment* env = wrap()->env();
690   HandleScope handle_scope(env->isolate());
691   Context::Scope context_scope(env->context());
692
693   Local<Value> argv[] = {
694     Integer::New(env->isolate(), nread),
695     Undefined(env->isolate()),
696     Undefined(env->isolate())
697   };
698
699   if (nread < 0)  {
700     if (buf->base != NULL)
701       free(buf->base);
702     wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
703     return;
704   }
705
706   if (nread == 0) {
707     if (buf->base != NULL)
708       free(buf->base);
709     return;
710   }
711
712   char* base = static_cast<char*>(realloc(buf->base, nread));
713   assert(static_cast<size_t>(nread) <= buf->len);
714   argv[1] = Buffer::Use(env, base, nread);
715
716   Local<Object> pending_obj;
717   if (pending == UV_TCP) {
718     pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle);
719   } else if (pending == UV_NAMED_PIPE) {
720     pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle);
721   } else if (pending == UV_UDP) {
722     pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle);
723   } else {
724     assert(pending == UV_UNKNOWN_HANDLE);
725   }
726
727   if (!pending_obj.IsEmpty()) {
728     argv[2] = pending_obj;
729   }
730
731   wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
732 }
733
734
735 int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
736   return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);
737 }
738
739 }  // namespace node