e0079b813646bb3c9352b45848135cc5b2cea8cd
[platform/upstream/nodejs.git] / src / stream_wrap.cc
1 // Copyright Joyent, Inc. and other Node contributors.
2 //
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
10 //
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
13 //
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22 #include "stream_wrap.h"
23 #include "env-inl.h"
24 #include "env.h"
25 #include "handle_wrap.h"
26 #include "node_buffer.h"
27 #include "node_counters.h"
28 #include "pipe_wrap.h"
29 #include "req_wrap.h"
30 #include "tcp_wrap.h"
31 #include "udp_wrap.h"
32 #include "util.h"
33 #include "util-inl.h"
34
35 #include <stdlib.h>  // abort()
36 #include <limits.h>  // INT_MAX
37
38
39 namespace node {
40
41 using v8::Array;
42 using v8::Context;
43 using v8::FunctionCallbackInfo;
44 using v8::Handle;
45 using v8::HandleScope;
46 using v8::Integer;
47 using v8::Local;
48 using v8::Number;
49 using v8::Object;
50 using v8::PropertyCallbackInfo;
51 using v8::String;
52 using v8::Undefined;
53 using v8::Value;
54
55
56 StreamWrap::StreamWrap(Environment* env,
57                        Local<Object> object,
58                        uv_stream_t* stream)
59     : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream)),
60       stream_(stream),
61       default_callbacks_(this),
62       callbacks_(&default_callbacks_) {
63 }
64
65
66 void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
67 #if !defined(_WIN32)
68   HandleScope scope(node_isolate);
69   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
70   int fd = -1;
71   if (wrap != NULL && wrap->stream() != NULL) {
72     fd = wrap->stream()->io_watcher.fd;
73   }
74   args.GetReturnValue().Set(fd);
75 #endif
76 }
77
78
79 void StreamWrap::UpdateWriteQueueSize() {
80   HandleScope scope(node_isolate);
81   Local<Integer> write_queue_size =
82       Integer::NewFromUnsigned(stream()->write_queue_size, node_isolate);
83   object()->Set(env()->write_queue_size_string(), write_queue_size);
84 }
85
86
87 void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
88   HandleScope scope(node_isolate);
89
90   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
91
92   int err;
93   if (wrap->is_named_pipe_ipc()) {
94     err = uv_read2_start(wrap->stream(), OnAlloc, OnRead2);
95   } else {
96     err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
97   }
98
99   args.GetReturnValue().Set(err);
100 }
101
102
103 void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
104   HandleScope scope(node_isolate);
105
106   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
107
108   int err = uv_read_stop(wrap->stream());
109   args.GetReturnValue().Set(err);
110 }
111
112
113 void StreamWrap::OnAlloc(uv_handle_t* handle,
114                          size_t suggested_size,
115                          uv_buf_t* buf) {
116   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
117   assert(wrap->stream() == reinterpret_cast<uv_stream_t*>(handle));
118   wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
119 }
120
121
122 template <class WrapType, class UVType>
123 static Local<Object> AcceptHandle(Environment* env, uv_stream_t* pipe) {
124   HandleScope scope(node_isolate);
125   Local<Object> wrap_obj;
126   UVType* handle;
127
128   wrap_obj = WrapType::Instantiate(env);
129   if (wrap_obj.IsEmpty())
130     return Local<Object>();
131
132   WrapType* wrap = Unwrap<WrapType>(wrap_obj);
133   handle = wrap->UVHandle();
134
135   if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
136     abort();
137
138   return scope.Close(wrap_obj);
139 }
140
141
142 void StreamWrap::OnReadCommon(uv_stream_t* handle,
143                               ssize_t nread,
144                               const uv_buf_t* buf,
145                               uv_handle_type pending) {
146   StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
147
148   // We should not be getting this callback if someone as already called
149   // uv_close() on the handle.
150   assert(wrap->persistent().IsEmpty() == false);
151
152   if (nread > 0) {
153     if (wrap->is_tcp()) {
154       NODE_COUNT_NET_BYTES_RECV(nread);
155     } else if (wrap->is_named_pipe()) {
156       NODE_COUNT_PIPE_BYTES_RECV(nread);
157     }
158   }
159
160   wrap->callbacks()->DoRead(handle, nread, buf, pending);
161 }
162
163
164 void StreamWrap::OnRead(uv_stream_t* handle,
165                         ssize_t nread,
166                         const uv_buf_t* buf) {
167   OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
168 }
169
170
171 void StreamWrap::OnRead2(uv_pipe_t* handle,
172                          ssize_t nread,
173                          const uv_buf_t* buf,
174                          uv_handle_type pending) {
175   OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
176 }
177
178
179 size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
180   assert(Buffer::HasInstance(val));
181
182   // Simple non-writev case
183   buf->base = Buffer::Data(val);
184   buf->len = Buffer::Length(val);
185
186   return buf->len;
187 }
188
189
190 void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
191   HandleScope handle_scope(args.GetIsolate());
192   Environment* env = Environment::GetCurrent(args.GetIsolate());
193
194   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
195
196   assert(args[0]->IsObject());
197   assert(Buffer::HasInstance(args[1]));
198
199   Local<Object> req_wrap_obj = args[0].As<Object>();
200   Local<Object> buf_obj = args[1].As<Object>();
201
202   size_t length = Buffer::Length(buf_obj);
203   char* storage = new char[sizeof(WriteWrap)];
204   WriteWrap* req_wrap =
205       new(storage) WriteWrap(env, req_wrap_obj, wrap);
206
207   uv_buf_t buf;
208   WriteBuffer(buf_obj, &buf);
209
210   int err = wrap->callbacks()->DoWrite(req_wrap,
211                                        &buf,
212                                        1,
213                                        NULL,
214                                        StreamWrap::AfterWrite);
215   req_wrap->Dispatched();
216   req_wrap_obj->Set(env->bytes_string(),
217                     Integer::NewFromUnsigned(length, node_isolate));
218   const char* msg = wrap->callbacks()->Error();
219   if (msg != NULL)
220     req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
221
222   if (err) {
223     req_wrap->~WriteWrap();
224     delete[] storage;
225   }
226
227   args.GetReturnValue().Set(err);
228 }
229
230
231 template <enum encoding encoding>
232 void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
233   HandleScope handle_scope(args.GetIsolate());
234   Environment* env = Environment::GetCurrent(args.GetIsolate());
235   int err;
236
237   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
238
239   assert(args[0]->IsObject());
240   assert(args[1]->IsString());
241
242   Local<Object> req_wrap_obj = args[0].As<Object>();
243   Local<String> string = args[1].As<String>();
244
245   // Compute the size of the storage that the string will be flattened into.
246   // For UTF8 strings that are very long, go ahead and take the hit for
247   // computing their actual size, rather than tripling the storage.
248   size_t storage_size;
249   if (encoding == UTF8 && string->Length() > 65535)
250     storage_size = StringBytes::Size(string, encoding);
251   else
252     storage_size = StringBytes::StorageSize(string, encoding);
253
254   if (storage_size > INT_MAX) {
255     args.GetReturnValue().Set(UV_ENOBUFS);
256     return;
257   }
258
259   char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
260   WriteWrap* req_wrap =
261       new(storage) WriteWrap(env, req_wrap_obj, wrap);
262
263   char* data = reinterpret_cast<char*>(ROUND_UP(
264       reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
265
266   size_t data_size;
267   data_size = StringBytes::Write(data, storage_size, string, encoding);
268
269   assert(data_size <= storage_size);
270
271   uv_buf_t buf;
272
273   buf.base = data;
274   buf.len = data_size;
275
276   if (!wrap->is_named_pipe_ipc()) {
277     err = wrap->callbacks()->DoWrite(req_wrap,
278                                      &buf,
279                                      1,
280                                      NULL,
281                                      StreamWrap::AfterWrite);
282   } else {
283     uv_handle_t* send_handle = NULL;
284
285     if (args[2]->IsObject()) {
286       Local<Object> send_handle_obj = args[2].As<Object>();
287       HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
288       send_handle = wrap->GetHandle();
289       // Reference StreamWrap instance to prevent it from being garbage
290       // collected before `AfterWrite` is called.
291       assert(!req_wrap->persistent().IsEmpty());
292       req_wrap->object()->Set(env->handle_string(), send_handle_obj);
293     }
294
295     err = wrap->callbacks()->DoWrite(
296         req_wrap,
297         &buf,
298         1,
299         reinterpret_cast<uv_stream_t*>(send_handle),
300         StreamWrap::AfterWrite);
301   }
302
303   req_wrap->Dispatched();
304   req_wrap->object()->Set(env->bytes_string(),
305                           Integer::NewFromUnsigned(data_size, node_isolate));
306   const char* msg = wrap->callbacks()->Error();
307   if (msg != NULL)
308     req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
309
310   if (err) {
311     req_wrap->~WriteWrap();
312     delete[] storage;
313   }
314
315   args.GetReturnValue().Set(err);
316 }
317
318
319 void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
320   HandleScope handle_scope(args.GetIsolate());
321   Environment* env = Environment::GetCurrent(args.GetIsolate());
322
323   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
324
325   assert(args[0]->IsObject());
326   assert(args[1]->IsArray());
327
328   Local<Object> req_wrap_obj = args[0].As<Object>();
329   Local<Array> chunks = args[1].As<Array>();
330   size_t count = chunks->Length() >> 1;
331
332   uv_buf_t bufs_[16];
333   uv_buf_t* bufs = bufs_;
334
335   // Determine storage size first
336   size_t storage_size = 0;
337   for (size_t i = 0; i < count; i++) {
338     Handle<Value> chunk = chunks->Get(i * 2);
339
340     if (Buffer::HasInstance(chunk))
341       continue;
342       // Buffer chunk, no additional storage required
343
344     // String chunk
345     Handle<String> string = chunk->ToString();
346     enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
347     size_t chunk_size;
348     if (encoding == UTF8 && string->Length() > 65535)
349       chunk_size = StringBytes::Size(string, encoding);
350     else
351       chunk_size = StringBytes::StorageSize(string, encoding);
352
353     storage_size += chunk_size + 15;
354   }
355
356   if (storage_size > INT_MAX) {
357     args.GetReturnValue().Set(UV_ENOBUFS);
358     return;
359   }
360
361   if (ARRAY_SIZE(bufs_) < count)
362     bufs = new uv_buf_t[count];
363
364   storage_size += sizeof(WriteWrap);
365   char* storage = new char[storage_size];
366   WriteWrap* req_wrap =
367       new(storage) WriteWrap(env, req_wrap_obj, wrap);
368
369   uint32_t bytes = 0;
370   size_t offset = sizeof(WriteWrap);
371   for (size_t i = 0; i < count; i++) {
372     Handle<Value> chunk = chunks->Get(i * 2);
373
374     // Write buffer
375     if (Buffer::HasInstance(chunk)) {
376       bufs[i].base = Buffer::Data(chunk);
377       bufs[i].len = Buffer::Length(chunk);
378       bytes += bufs[i].len;
379       continue;
380     }
381
382     // Write string
383     offset = ROUND_UP(offset, 16);
384     assert(offset < storage_size);
385     char* str_storage = storage + offset;
386     size_t str_size = storage_size - offset;
387
388     Handle<String> string = chunk->ToString();
389     enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
390     str_size = StringBytes::Write(str_storage, str_size, string, encoding);
391     bufs[i].base = str_storage;
392     bufs[i].len = str_size;
393     offset += str_size;
394     bytes += str_size;
395   }
396
397   int err = wrap->callbacks()->DoWrite(req_wrap,
398                                        bufs,
399                                        count,
400                                        NULL,
401                                        StreamWrap::AfterWrite);
402
403   // Deallocate space
404   if (bufs != bufs_)
405     delete[] bufs;
406
407   req_wrap->Dispatched();
408   req_wrap->object()->Set(env->bytes_string(),
409                           Number::New(node_isolate, bytes));
410   const char* msg = wrap->callbacks()->Error();
411   if (msg != NULL)
412     req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
413
414   if (err) {
415     req_wrap->~WriteWrap();
416     delete[] storage;
417   }
418
419   args.GetReturnValue().Set(err);
420 }
421
422
423 void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
424   WriteStringImpl<ASCII>(args);
425 }
426
427
428 void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
429   WriteStringImpl<UTF8>(args);
430 }
431
432
433 void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
434   WriteStringImpl<UCS2>(args);
435 }
436
437
438 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
439   WriteWrap* req_wrap = CONTAINER_OF(req, WriteWrap, req_);
440   StreamWrap* wrap = req_wrap->wrap();
441   Environment* env = wrap->env();
442
443   HandleScope handle_scope(env->isolate());
444   Context::Scope context_scope(env->context());
445
446   // The wrap and request objects should still be there.
447   assert(req_wrap->persistent().IsEmpty() == false);
448   assert(wrap->persistent().IsEmpty() == false);
449
450   // Unref handle property
451   Local<Object> req_wrap_obj = req_wrap->object();
452   req_wrap_obj->Delete(env->handle_string());
453   wrap->callbacks()->AfterWrite(req_wrap);
454
455   Local<Value> argv[] = {
456     Integer::New(status, node_isolate),
457     wrap->object(),
458     req_wrap_obj,
459     Undefined()
460   };
461
462   const char* msg = wrap->callbacks()->Error();
463   if (msg != NULL)
464     argv[3] = OneByteString(env->isolate(), msg);
465
466   req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
467
468   req_wrap->~WriteWrap();
469   delete[] reinterpret_cast<char*>(req_wrap);
470 }
471
472
473 void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
474   HandleScope handle_scope(args.GetIsolate());
475   Environment* env = Environment::GetCurrent(args.GetIsolate());
476
477   StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
478
479   assert(args[0]->IsObject());
480   Local<Object> req_wrap_obj = args[0].As<Object>();
481
482   ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj);
483   int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
484   req_wrap->Dispatched();
485   if (err)
486     delete req_wrap;
487   args.GetReturnValue().Set(err);
488 }
489
490
491 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
492   ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
493   StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
494   Environment* env = wrap->env();
495
496   // The wrap and request objects should still be there.
497   assert(req_wrap->persistent().IsEmpty() == false);
498   assert(wrap->persistent().IsEmpty() == false);
499
500   HandleScope handle_scope(env->isolate());
501   Context::Scope context_scope(env->context());
502
503   Local<Object> req_wrap_obj = req_wrap->object();
504   Local<Value> argv[3] = {
505     Integer::New(status, node_isolate),
506     wrap->object(),
507     req_wrap_obj
508   };
509
510   req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
511
512   delete req_wrap;
513 }
514
515
516 const char* StreamWrapCallbacks::Error() {
517   return NULL;
518 }
519
520
521 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
522                                  uv_buf_t* bufs,
523                                  size_t count,
524                                  uv_stream_t* send_handle,
525                                  uv_write_cb cb) {
526   int r;
527   if (send_handle == NULL) {
528     r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
529   } else {
530     r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
531   }
532
533   if (!r) {
534     size_t bytes = 0;
535     for (size_t i = 0; i < count; i++)
536       bytes += bufs[i].len;
537     if (wrap()->stream()->type == UV_TCP) {
538       NODE_COUNT_NET_BYTES_SENT(bytes);
539     } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
540       NODE_COUNT_PIPE_BYTES_SENT(bytes);
541     }
542   }
543
544   wrap()->UpdateWriteQueueSize();
545
546   return r;
547 }
548
549
550 void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
551   wrap()->UpdateWriteQueueSize();
552 }
553
554
555 void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
556                                   size_t suggested_size,
557                                   uv_buf_t* buf) {
558   buf->base = static_cast<char*>(malloc(suggested_size));
559   buf->len = suggested_size;
560
561   if (buf->base == NULL && suggested_size > 0) {
562     FatalError(
563         "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
564         "Out Of Memory");
565   }
566 }
567
568
569 void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
570                                  ssize_t nread,
571                                  const uv_buf_t* buf,
572                                  uv_handle_type pending) {
573   Environment* env = wrap()->env();
574   HandleScope handle_scope(env->isolate());
575   Context::Scope context_scope(env->context());
576
577   Local<Value> argv[] = {
578     Integer::New(nread, node_isolate),
579     Undefined(),
580     Undefined()
581   };
582
583   if (nread < 0)  {
584     if (buf->base != NULL)
585       free(buf->base);
586     wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
587     return;
588   }
589
590   if (nread == 0) {
591     if (buf->base != NULL)
592       free(buf->base);
593     return;
594   }
595
596   char* base = static_cast<char*>(realloc(buf->base, nread));
597   assert(static_cast<size_t>(nread) <= buf->len);
598   argv[1] = Buffer::Use(env, base, nread);
599
600   Local<Object> pending_obj;
601   if (pending == UV_TCP) {
602     pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle);
603   } else if (pending == UV_NAMED_PIPE) {
604     pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle);
605   } else if (pending == UV_UDP) {
606     pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle);
607   } else {
608     assert(pending == UV_UNKNOWN_HANDLE);
609   }
610
611   if (!pending_obj.IsEmpty()) {
612     argv[2] = pending_obj;
613   }
614
615   wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
616 }
617
618
619 int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
620   return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);
621 }
622
623 }  // namespace node