49a2f90f46f0cf57d387703cc88ed7021efa9b9e
[platform/upstream/nodejs.git] / src / stream_wrap.cc
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 #include "stream_wrap.h"
23 #include "env-inl.h"
24 #include "env.h"
25 #include "handle_wrap.h"
26 #include "node_buffer.h"
27 #include "node_counters.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32
33 #include <stdlib.h>  // abort()
34 #include <limits.h>  // INT_MAX
35
36
37 namespace node {
38
39 using v8::Array;
40 using v8::Context;
41 using v8::FunctionCallbackInfo;
42 using v8::Handle;
43 using v8::HandleScope;
44 using v8::Integer;
45 using v8::Local;
46 using v8::Number;
47 using v8::Object;
48 using v8::PropertyCallbackInfo;
49 using v8::String;
50 using v8::Undefined;
51 using v8::Value;
52
53
54 StreamWrap::StreamWrap(Environment* env,
55                        Local<Object> object,
56                        uv_stream_t* stream)
57     : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream))
58     , stream_(stream)
59     , default_callbacks_(this)
60     , callbacks_(&default_callbacks_) {
61 }
62
63
64 void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
65 #if !defined(_WIN32)
66   HandleScope scope(node_isolate);
67   StreamWrap* wrap;
68   NODE_UNWRAP_NO_ABORT(args.This(), StreamWrap, wrap);
69   int fd = -1;
70   if (wrap != NULL && wrap->stream() != NULL) {
71     fd = wrap->stream()->io_watcher.fd;
72   }
73   args.GetReturnValue().Set(fd);
74 #endif
75 }
76
77
78 void StreamWrap::UpdateWriteQueueSize() {
79   HandleScope scope(node_isolate);
80   Local<Integer> write_queue_size =
81       Integer::NewFromUnsigned(stream()->write_queue_size, node_isolate);
82   object()->Set(env()->write_queue_size_string(), write_queue_size);
83 }
84
85
86 void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
87   HandleScope scope(node_isolate);
88
89   StreamWrap* wrap;
90   NODE_UNWRAP(args.This(), StreamWrap, wrap);
91
92   int err;
93   if (wrap->is_named_pipe_ipc()) {
94     err = uv_read2_start(wrap->stream(), OnAlloc, OnRead2);
95   } else {
96     err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
97   }
98
99   args.GetReturnValue().Set(err);
100 }
101
102
103 void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
104   HandleScope scope(node_isolate);
105
106   StreamWrap* wrap;
107   NODE_UNWRAP(args.This(), StreamWrap, wrap);
108
109   int err = uv_read_stop(wrap->stream());
110   args.GetReturnValue().Set(err);
111 }
112
113
114 void StreamWrap::OnAlloc(uv_handle_t* handle,
115                          size_t suggested_size,
116                          uv_buf_t* buf) {
117   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
118   assert(wrap->stream() == reinterpret_cast<uv_stream_t*>(handle));
119   wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
120 }
121
122
123 template <class WrapType, class UVType>
124 static Local<Object> AcceptHandle(Environment* env, uv_stream_t* pipe) {
125   HandleScope scope(node_isolate);
126   Local<Object> wrap_obj;
127   UVType* handle;
128
129   wrap_obj = WrapType::Instantiate(env);
130   if (wrap_obj.IsEmpty())
131     return Local<Object>();
132
133   WrapType* wrap;
134   NODE_UNWRAP(wrap_obj, WrapType, wrap);
135   handle = wrap->UVHandle();
136
137   if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
138     abort();
139
140   return scope.Close(wrap_obj);
141 }
142
143
144 void StreamWrap::OnReadCommon(uv_stream_t* handle,
145                               ssize_t nread,
146                               const uv_buf_t* buf,
147                               uv_handle_type pending) {
148   HandleScope scope(node_isolate);
149
150   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
151
152   // We should not be getting this callback if someone as already called
153   // uv_close() on the handle.
154   assert(wrap->persistent().IsEmpty() == false);
155
156   if (nread > 0) {
157     if (wrap->is_tcp()) {
158       NODE_COUNT_NET_BYTES_RECV(nread);
159     } else if (wrap->is_named_pipe()) {
160       NODE_COUNT_PIPE_BYTES_RECV(nread);
161     }
162   }
163
164   wrap->callbacks()->DoRead(handle, nread, buf, pending);
165 }
166
167
168 void StreamWrap::OnRead(uv_stream_t* handle,
169                         ssize_t nread,
170                         const uv_buf_t* buf) {
171   OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
172 }
173
174
175 void StreamWrap::OnRead2(uv_pipe_t* handle,
176                          ssize_t nread,
177                          const uv_buf_t* buf,
178                          uv_handle_type pending) {
179   OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
180 }
181
182
183 size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
184   assert(Buffer::HasInstance(val));
185
186   // Simple non-writev case
187   buf->base = Buffer::Data(val);
188   buf->len = Buffer::Length(val);
189
190   return buf->len;
191 }
192
193
194 void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
195   Environment* env = Environment::GetCurrent(args.GetIsolate());
196   HandleScope handle_scope(args.GetIsolate());
197
198   StreamWrap* wrap;
199   NODE_UNWRAP(args.This(), StreamWrap, wrap);
200
201   assert(args[0]->IsObject());
202   assert(Buffer::HasInstance(args[1]));
203
204   Local<Object> req_wrap_obj = args[0].As<Object>();
205   Local<Object> buf_obj = args[1].As<Object>();
206
207   size_t length = Buffer::Length(buf_obj);
208   char* storage = new char[sizeof(WriteWrap)];
209   WriteWrap* req_wrap =
210       new(storage) WriteWrap(env, req_wrap_obj, wrap);
211
212   uv_buf_t buf;
213   WriteBuffer(buf_obj, &buf);
214
215   int err = wrap->callbacks()->DoWrite(req_wrap,
216                                        &buf,
217                                        1,
218                                        NULL,
219                                        StreamWrap::AfterWrite);
220   req_wrap->Dispatched();
221   req_wrap_obj->Set(env->bytes_string(),
222                     Integer::NewFromUnsigned(length, node_isolate));
223
224   if (err) {
225     req_wrap->~WriteWrap();
226     delete[] storage;
227   }
228
229   args.GetReturnValue().Set(err);
230 }
231
232
233 template <enum encoding encoding>
234 void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
235   Environment* env = Environment::GetCurrent(args.GetIsolate());
236   HandleScope handle_scope(args.GetIsolate());
237   int err;
238
239   StreamWrap* wrap;
240   NODE_UNWRAP(args.This(), StreamWrap, wrap);
241
242   assert(args[0]->IsObject());
243   assert(args[1]->IsString());
244
245   Local<Object> req_wrap_obj = args[0].As<Object>();
246   Local<String> string = args[1].As<String>();
247
248   // Compute the size of the storage that the string will be flattened into.
249   // For UTF8 strings that are very long, go ahead and take the hit for
250   // computing their actual size, rather than tripling the storage.
251   size_t storage_size;
252   if (encoding == UTF8 && string->Length() > 65535)
253     storage_size = StringBytes::Size(string, encoding);
254   else
255     storage_size = StringBytes::StorageSize(string, encoding);
256
257   if (storage_size > INT_MAX) {
258     args.GetReturnValue().Set(UV_ENOBUFS);
259     return;
260   }
261
262   char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
263   WriteWrap* req_wrap =
264       new(storage) WriteWrap(env, req_wrap_obj, wrap);
265
266   char* data = reinterpret_cast<char*>(ROUND_UP(
267       reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
268
269   size_t data_size;
270   data_size = StringBytes::Write(data, storage_size, string, encoding);
271
272   assert(data_size <= storage_size);
273
274   uv_buf_t buf;
275
276   buf.base = data;
277   buf.len = data_size;
278
279   if (!wrap->is_named_pipe_ipc()) {
280     err = wrap->callbacks()->DoWrite(req_wrap,
281                                      &buf,
282                                      1,
283                                      NULL,
284                                      StreamWrap::AfterWrite);
285   } else {
286     uv_handle_t* send_handle = NULL;
287
288     if (args[2]->IsObject()) {
289       Local<Object> send_handle_obj = args[2].As<Object>();
290       HandleWrap* wrap;
291       NODE_UNWRAP(send_handle_obj, HandleWrap, wrap);
292       send_handle = wrap->GetHandle();
293       // Reference StreamWrap instance to prevent it from being garbage
294       // collected before `AfterWrite` is called.
295       assert(!req_wrap->persistent().IsEmpty());
296       req_wrap->object()->Set(env->handle_string(), send_handle_obj);
297     }
298
299     err = wrap->callbacks()->DoWrite(
300         req_wrap,
301         &buf,
302         1,
303         reinterpret_cast<uv_stream_t*>(send_handle),
304         StreamWrap::AfterWrite);
305   }
306
307   req_wrap->Dispatched();
308   req_wrap->object()->Set(env->bytes_string(),
309                           Number::New(node_isolate, data_size));
310
311   if (err) {
312     req_wrap->~WriteWrap();
313     delete[] storage;
314   }
315
316   args.GetReturnValue().Set(err);
317 }
318
319
320 void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
321   Environment* env = Environment::GetCurrent(args.GetIsolate());
322   HandleScope handle_scope(args.GetIsolate());
323
324   StreamWrap* wrap;
325   NODE_UNWRAP(args.This(), StreamWrap, wrap);
326
327   assert(args[0]->IsObject());
328   assert(args[1]->IsArray());
329
330   Local<Object> req_wrap_obj = args[0].As<Object>();
331   Local<Array> chunks = args[1].As<Array>();
332   size_t count = chunks->Length() >> 1;
333
334   uv_buf_t bufs_[16];
335   uv_buf_t* bufs = bufs_;
336
337   // Determine storage size first
338   size_t storage_size = 0;
339   for (size_t i = 0; i < count; i++) {
340     Handle<Value> chunk = chunks->Get(i * 2);
341
342     if (Buffer::HasInstance(chunk))
343       continue;
344       // Buffer chunk, no additional storage required
345
346     // String chunk
347     Handle<String> string = chunk->ToString();
348     enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
349     size_t chunk_size;
350     if (encoding == UTF8 && string->Length() > 65535)
351       chunk_size = StringBytes::Size(string, encoding);
352     else
353       chunk_size = StringBytes::StorageSize(string, encoding);
354
355     storage_size += chunk_size + 15;
356   }
357
358   if (storage_size > INT_MAX) {
359     args.GetReturnValue().Set(UV_ENOBUFS);
360     return;
361   }
362
363   if (ARRAY_SIZE(bufs_) < count)
364     bufs = new uv_buf_t[count];
365
366   storage_size += sizeof(WriteWrap);
367   char* storage = new char[storage_size];
368   WriteWrap* req_wrap =
369       new(storage) WriteWrap(env, req_wrap_obj, wrap);
370
371   uint32_t bytes = 0;
372   size_t offset = sizeof(WriteWrap);
373   for (size_t i = 0; i < count; i++) {
374     Handle<Value> chunk = chunks->Get(i * 2);
375
376     // Write buffer
377     if (Buffer::HasInstance(chunk)) {
378       bufs[i].base = Buffer::Data(chunk);
379       bufs[i].len = Buffer::Length(chunk);
380       bytes += bufs[i].len;
381       continue;
382     }
383
384     // Write string
385     offset = ROUND_UP(offset, 16);
386     assert(offset < storage_size);
387     char* str_storage = storage + offset;
388     size_t str_size = storage_size - offset;
389
390     Handle<String> string = chunk->ToString();
391     enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
392     str_size = StringBytes::Write(str_storage, str_size, string, encoding);
393     bufs[i].base = str_storage;
394     bufs[i].len = str_size;
395     offset += str_size;
396     bytes += str_size;
397   }
398
399   int err = wrap->callbacks()->DoWrite(req_wrap,
400                                        bufs,
401                                        count,
402                                        NULL,
403                                        StreamWrap::AfterWrite);
404
405   // Deallocate space
406   if (bufs != bufs_)
407     delete[] bufs;
408
409   req_wrap->Dispatched();
410   req_wrap->object()->Set(env->bytes_string(),
411                           Number::New(node_isolate, bytes));
412
413   if (err) {
414     req_wrap->~WriteWrap();
415     delete[] storage;
416   }
417
418   args.GetReturnValue().Set(err);
419 }
420
421
422 void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
423   WriteStringImpl<ASCII>(args);
424 }
425
426
427 void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
428   WriteStringImpl<UTF8>(args);
429 }
430
431
432 void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
433   WriteStringImpl<UCS2>(args);
434 }
435
436
437 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
438   WriteWrap* req_wrap = container_of(req, WriteWrap, req_);
439   StreamWrap* wrap = req_wrap->wrap();
440   Environment* env = wrap->env();
441
442   Context::Scope context_scope(env->context());
443   HandleScope handle_scope(env->isolate());
444
445   // The wrap and request objects should still be there.
446   assert(req_wrap->persistent().IsEmpty() == false);
447   assert(wrap->persistent().IsEmpty() == false);
448
449   // Unref handle property
450   Local<Object> req_wrap_obj = req_wrap->object();
451   req_wrap_obj->Delete(env->handle_string());
452   wrap->callbacks_->AfterWrite(req_wrap);
453
454   Local<Value> argv[] = {
455     Integer::New(status, node_isolate),
456     wrap->object(),
457     req_wrap_obj
458   };
459
460   MakeCallback(env,
461                req_wrap_obj,
462                env->oncomplete_string(),
463                ARRAY_SIZE(argv),
464                argv);
465
466   req_wrap->~WriteWrap();
467   delete[] reinterpret_cast<char*>(req_wrap);
468 }
469
470
471 void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
472   Environment* env = Environment::GetCurrent(args.GetIsolate());
473   HandleScope handle_scope(args.GetIsolate());
474
475   StreamWrap* wrap;
476   NODE_UNWRAP(args.This(), StreamWrap, wrap);
477
478   assert(args[0]->IsObject());
479   Local<Object> req_wrap_obj = args[0].As<Object>();
480
481   ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj);
482   int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
483   req_wrap->Dispatched();
484   if (err) delete req_wrap;
485   args.GetReturnValue().Set(err);
486 }
487
488
489 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
490   ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
491   StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
492   Environment* env = wrap->env();
493
494   // The wrap and request objects should still be there.
495   assert(req_wrap->persistent().IsEmpty() == false);
496   assert(wrap->persistent().IsEmpty() == false);
497
498   Context::Scope context_scope(env->context());
499   HandleScope handle_scope(env->isolate());
500
501   Local<Object> req_wrap_obj = req_wrap->object();
502   Local<Value> argv[3] = {
503     Integer::New(status, node_isolate),
504     wrap->object(),
505     req_wrap_obj
506   };
507
508   MakeCallback(env,
509                req_wrap_obj,
510                env->oncomplete_string(),
511                ARRAY_SIZE(argv),
512                argv);
513
514   delete req_wrap;
515 }
516
517
518 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
519                                  uv_buf_t* bufs,
520                                  size_t count,
521                                  uv_stream_t* send_handle,
522                                  uv_write_cb cb) {
523   int r;
524   if (send_handle == NULL) {
525     r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
526   } else {
527     r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
528   }
529
530   if (!r) {
531     size_t bytes = 0;
532     for (size_t i = 0; i < count; i++)
533       bytes += bufs[i].len;
534     if (wrap()->stream()->type == UV_TCP) {
535       NODE_COUNT_NET_BYTES_SENT(bytes);
536     } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
537       NODE_COUNT_PIPE_BYTES_SENT(bytes);
538     }
539   }
540
541   wrap()->UpdateWriteQueueSize();
542
543   return r;
544 }
545
546
547 void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
548   wrap()->UpdateWriteQueueSize();
549 }
550
551
552 void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
553                                   size_t suggested_size,
554                                   uv_buf_t* buf) {
555   buf->base = static_cast<char*>(malloc(suggested_size));
556   buf->len = suggested_size;
557
558   if (buf->base == NULL && suggested_size > 0) {
559     FatalError(
560         "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
561         "Out Of Memory");
562   }
563 }
564
565
566 void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
567                                  ssize_t nread,
568                                  const uv_buf_t* buf,
569                                  uv_handle_type pending) {
570   Environment* env = wrap()->env();
571   Context::Scope context_scope(env->context());
572   HandleScope handle_scope(env->isolate());
573
574   Local<Value> argv[] = {
575     Integer::New(nread, node_isolate),
576     Undefined(),
577     Undefined()
578   };
579
580   if (nread < 0)  {
581     if (buf->base != NULL)
582       free(buf->base);
583     MakeCallback(env, Self(), env->onread_string(), ARRAY_SIZE(argv), argv);
584     return;
585   }
586
587   if (nread == 0) {
588     if (buf->base != NULL)
589       free(buf->base);
590     return;
591   }
592
593   char* base = static_cast<char*>(realloc(buf->base, nread));
594   assert(static_cast<size_t>(nread) <= buf->len);
595   argv[1] = Buffer::Use(env, base, nread);
596
597   Local<Object> pending_obj;
598   if (pending == UV_TCP) {
599     pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle);
600   } else if (pending == UV_NAMED_PIPE) {
601     pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle);
602   } else if (pending == UV_UDP) {
603     pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle);
604   } else {
605     assert(pending == UV_UNKNOWN_HANDLE);
606   }
607
608   if (!pending_obj.IsEmpty()) {
609     argv[2] = pending_obj;
610   }
611
612   MakeCallback(env,
613                wrap()->object(),
614                env->onread_string(),
615                ARRAY_SIZE(argv),
616                argv);
617 }
618
619
620 int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
621   return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);
622 }
623
624
625 Handle<Object> StreamWrapCallbacks::Self() {
626   return wrap()->object();
627 }
628
629 }  // namespace node