1 // Copyright Joyent, Inc. and other Node contributors.
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
22 #include "stream_wrap.h"
25 #include "handle_wrap.h"
26 #include "node_buffer.h"
27 #include "node_counters.h"
28 #include "pipe_wrap.h"
35 #include <stdlib.h> // abort()
36 #include <limits.h> // INT_MAX
43 using v8::FunctionCallbackInfo;
45 using v8::HandleScope;
50 using v8::PropertyCallbackInfo;
56 StreamWrap::StreamWrap(Environment* env,
59 : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream)),
61 default_callbacks_(this),
62 callbacks_(&default_callbacks_) {
66 void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
68 HandleScope scope(node_isolate);
69 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
71 if (wrap != NULL && wrap->stream() != NULL) {
72 fd = wrap->stream()->io_watcher.fd;
74 args.GetReturnValue().Set(fd);
79 void StreamWrap::UpdateWriteQueueSize() {
80 HandleScope scope(node_isolate);
81 Local<Integer> write_queue_size =
82 Integer::NewFromUnsigned(stream()->write_queue_size, node_isolate);
83 object()->Set(env()->write_queue_size_string(), write_queue_size);
87 void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
88 HandleScope scope(node_isolate);
90 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
93 if (wrap->is_named_pipe_ipc()) {
94 err = uv_read2_start(wrap->stream(), OnAlloc, OnRead2);
96 err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
99 args.GetReturnValue().Set(err);
103 void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
104 HandleScope scope(node_isolate);
106 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
108 int err = uv_read_stop(wrap->stream());
109 args.GetReturnValue().Set(err);
113 void StreamWrap::OnAlloc(uv_handle_t* handle,
114 size_t suggested_size,
116 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
117 assert(wrap->stream() == reinterpret_cast<uv_stream_t*>(handle));
118 wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
122 template <class WrapType, class UVType>
123 static Local<Object> AcceptHandle(Environment* env, uv_stream_t* pipe) {
124 HandleScope scope(node_isolate);
125 Local<Object> wrap_obj;
128 wrap_obj = WrapType::Instantiate(env);
129 if (wrap_obj.IsEmpty())
130 return Local<Object>();
132 WrapType* wrap = Unwrap<WrapType>(wrap_obj);
133 handle = wrap->UVHandle();
135 if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
138 return scope.Close(wrap_obj);
142 void StreamWrap::OnReadCommon(uv_stream_t* handle,
145 uv_handle_type pending) {
146 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
148 // We should not be getting this callback if someone as already called
149 // uv_close() on the handle.
150 assert(wrap->persistent().IsEmpty() == false);
153 if (wrap->is_tcp()) {
154 NODE_COUNT_NET_BYTES_RECV(nread);
155 } else if (wrap->is_named_pipe()) {
156 NODE_COUNT_PIPE_BYTES_RECV(nread);
160 wrap->callbacks()->DoRead(handle, nread, buf, pending);
164 void StreamWrap::OnRead(uv_stream_t* handle,
166 const uv_buf_t* buf) {
167 OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
171 void StreamWrap::OnRead2(uv_pipe_t* handle,
174 uv_handle_type pending) {
175 OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
179 size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
180 assert(Buffer::HasInstance(val));
182 // Simple non-writev case
183 buf->base = Buffer::Data(val);
184 buf->len = Buffer::Length(val);
190 void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
191 HandleScope handle_scope(args.GetIsolate());
192 Environment* env = Environment::GetCurrent(args.GetIsolate());
194 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
196 assert(args[0]->IsObject());
197 assert(Buffer::HasInstance(args[1]));
199 Local<Object> req_wrap_obj = args[0].As<Object>();
200 Local<Object> buf_obj = args[1].As<Object>();
202 size_t length = Buffer::Length(buf_obj);
203 char* storage = new char[sizeof(WriteWrap)];
204 WriteWrap* req_wrap =
205 new(storage) WriteWrap(env, req_wrap_obj, wrap);
208 WriteBuffer(buf_obj, &buf);
210 int err = wrap->callbacks()->DoWrite(req_wrap,
214 StreamWrap::AfterWrite);
215 req_wrap->Dispatched();
216 req_wrap_obj->Set(env->bytes_string(),
217 Integer::NewFromUnsigned(length, node_isolate));
218 const char* msg = wrap->callbacks()->Error();
220 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
223 req_wrap->~WriteWrap();
227 args.GetReturnValue().Set(err);
231 template <enum encoding encoding>
232 void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
233 HandleScope handle_scope(args.GetIsolate());
234 Environment* env = Environment::GetCurrent(args.GetIsolate());
237 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
239 assert(args[0]->IsObject());
240 assert(args[1]->IsString());
242 Local<Object> req_wrap_obj = args[0].As<Object>();
243 Local<String> string = args[1].As<String>();
245 // Compute the size of the storage that the string will be flattened into.
246 // For UTF8 strings that are very long, go ahead and take the hit for
247 // computing their actual size, rather than tripling the storage.
249 if (encoding == UTF8 && string->Length() > 65535)
250 storage_size = StringBytes::Size(string, encoding);
252 storage_size = StringBytes::StorageSize(string, encoding);
254 if (storage_size > INT_MAX) {
255 args.GetReturnValue().Set(UV_ENOBUFS);
259 char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
260 WriteWrap* req_wrap =
261 new(storage) WriteWrap(env, req_wrap_obj, wrap);
263 char* data = reinterpret_cast<char*>(ROUND_UP(
264 reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
267 data_size = StringBytes::Write(data, storage_size, string, encoding);
269 assert(data_size <= storage_size);
276 if (!wrap->is_named_pipe_ipc()) {
277 err = wrap->callbacks()->DoWrite(req_wrap,
281 StreamWrap::AfterWrite);
283 uv_handle_t* send_handle = NULL;
285 if (args[2]->IsObject()) {
286 Local<Object> send_handle_obj = args[2].As<Object>();
287 HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
288 send_handle = wrap->GetHandle();
289 // Reference StreamWrap instance to prevent it from being garbage
290 // collected before `AfterWrite` is called.
291 assert(!req_wrap->persistent().IsEmpty());
292 req_wrap->object()->Set(env->handle_string(), send_handle_obj);
295 err = wrap->callbacks()->DoWrite(
299 reinterpret_cast<uv_stream_t*>(send_handle),
300 StreamWrap::AfterWrite);
303 req_wrap->Dispatched();
304 req_wrap->object()->Set(env->bytes_string(),
305 Integer::NewFromUnsigned(data_size, node_isolate));
306 const char* msg = wrap->callbacks()->Error();
308 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
311 req_wrap->~WriteWrap();
315 args.GetReturnValue().Set(err);
319 void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
320 HandleScope handle_scope(args.GetIsolate());
321 Environment* env = Environment::GetCurrent(args.GetIsolate());
323 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
325 assert(args[0]->IsObject());
326 assert(args[1]->IsArray());
328 Local<Object> req_wrap_obj = args[0].As<Object>();
329 Local<Array> chunks = args[1].As<Array>();
330 size_t count = chunks->Length() >> 1;
333 uv_buf_t* bufs = bufs_;
335 // Determine storage size first
336 size_t storage_size = 0;
337 for (size_t i = 0; i < count; i++) {
338 Handle<Value> chunk = chunks->Get(i * 2);
340 if (Buffer::HasInstance(chunk))
342 // Buffer chunk, no additional storage required
345 Handle<String> string = chunk->ToString();
346 enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
348 if (encoding == UTF8 && string->Length() > 65535)
349 chunk_size = StringBytes::Size(string, encoding);
351 chunk_size = StringBytes::StorageSize(string, encoding);
353 storage_size += chunk_size + 15;
356 if (storage_size > INT_MAX) {
357 args.GetReturnValue().Set(UV_ENOBUFS);
361 if (ARRAY_SIZE(bufs_) < count)
362 bufs = new uv_buf_t[count];
364 storage_size += sizeof(WriteWrap);
365 char* storage = new char[storage_size];
366 WriteWrap* req_wrap =
367 new(storage) WriteWrap(env, req_wrap_obj, wrap);
370 size_t offset = sizeof(WriteWrap);
371 for (size_t i = 0; i < count; i++) {
372 Handle<Value> chunk = chunks->Get(i * 2);
375 if (Buffer::HasInstance(chunk)) {
376 bufs[i].base = Buffer::Data(chunk);
377 bufs[i].len = Buffer::Length(chunk);
378 bytes += bufs[i].len;
383 offset = ROUND_UP(offset, 16);
384 assert(offset < storage_size);
385 char* str_storage = storage + offset;
386 size_t str_size = storage_size - offset;
388 Handle<String> string = chunk->ToString();
389 enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
390 str_size = StringBytes::Write(str_storage, str_size, string, encoding);
391 bufs[i].base = str_storage;
392 bufs[i].len = str_size;
397 int err = wrap->callbacks()->DoWrite(req_wrap,
401 StreamWrap::AfterWrite);
407 req_wrap->Dispatched();
408 req_wrap->object()->Set(env->bytes_string(),
409 Number::New(node_isolate, bytes));
410 const char* msg = wrap->callbacks()->Error();
412 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
415 req_wrap->~WriteWrap();
419 args.GetReturnValue().Set(err);
423 void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
424 WriteStringImpl<ASCII>(args);
428 void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
429 WriteStringImpl<UTF8>(args);
433 void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
434 WriteStringImpl<UCS2>(args);
438 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
439 WriteWrap* req_wrap = CONTAINER_OF(req, WriteWrap, req_);
440 StreamWrap* wrap = req_wrap->wrap();
441 Environment* env = wrap->env();
443 HandleScope handle_scope(env->isolate());
444 Context::Scope context_scope(env->context());
446 // The wrap and request objects should still be there.
447 assert(req_wrap->persistent().IsEmpty() == false);
448 assert(wrap->persistent().IsEmpty() == false);
450 // Unref handle property
451 Local<Object> req_wrap_obj = req_wrap->object();
452 req_wrap_obj->Delete(env->handle_string());
453 wrap->callbacks()->AfterWrite(req_wrap);
455 Local<Value> argv[] = {
456 Integer::New(status, node_isolate),
462 const char* msg = wrap->callbacks()->Error();
464 argv[3] = OneByteString(env->isolate(), msg);
466 req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
468 req_wrap->~WriteWrap();
469 delete[] reinterpret_cast<char*>(req_wrap);
473 void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
474 HandleScope handle_scope(args.GetIsolate());
475 Environment* env = Environment::GetCurrent(args.GetIsolate());
477 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
479 assert(args[0]->IsObject());
480 Local<Object> req_wrap_obj = args[0].As<Object>();
482 ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj);
483 int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
484 req_wrap->Dispatched();
487 args.GetReturnValue().Set(err);
491 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
492 ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
493 StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
494 Environment* env = wrap->env();
496 // The wrap and request objects should still be there.
497 assert(req_wrap->persistent().IsEmpty() == false);
498 assert(wrap->persistent().IsEmpty() == false);
500 HandleScope handle_scope(env->isolate());
501 Context::Scope context_scope(env->context());
503 Local<Object> req_wrap_obj = req_wrap->object();
504 Local<Value> argv[3] = {
505 Integer::New(status, node_isolate),
510 req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
516 const char* StreamWrapCallbacks::Error() {
521 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
524 uv_stream_t* send_handle,
527 if (send_handle == NULL) {
528 r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
530 r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
535 for (size_t i = 0; i < count; i++)
536 bytes += bufs[i].len;
537 if (wrap()->stream()->type == UV_TCP) {
538 NODE_COUNT_NET_BYTES_SENT(bytes);
539 } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
540 NODE_COUNT_PIPE_BYTES_SENT(bytes);
544 wrap()->UpdateWriteQueueSize();
550 void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
551 wrap()->UpdateWriteQueueSize();
555 void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
556 size_t suggested_size,
558 buf->base = static_cast<char*>(malloc(suggested_size));
559 buf->len = suggested_size;
561 if (buf->base == NULL && suggested_size > 0) {
563 "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
569 void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
572 uv_handle_type pending) {
573 Environment* env = wrap()->env();
574 HandleScope handle_scope(env->isolate());
575 Context::Scope context_scope(env->context());
577 Local<Value> argv[] = {
578 Integer::New(nread, node_isolate),
584 if (buf->base != NULL)
586 wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
591 if (buf->base != NULL)
596 char* base = static_cast<char*>(realloc(buf->base, nread));
597 assert(static_cast<size_t>(nread) <= buf->len);
598 argv[1] = Buffer::Use(env, base, nread);
600 Local<Object> pending_obj;
601 if (pending == UV_TCP) {
602 pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle);
603 } else if (pending == UV_NAMED_PIPE) {
604 pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle);
605 } else if (pending == UV_UDP) {
606 pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle);
608 assert(pending == UV_UNKNOWN_HANDLE);
611 if (!pending_obj.IsEmpty()) {
612 argv[2] = pending_obj;
615 wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
619 int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
620 return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);