1 // Copyright Joyent, Inc. and other Node contributors.
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
22 #include "stream_wrap.h"
25 #include "handle_wrap.h"
26 #include "node_buffer.h"
27 #include "node_counters.h"
28 #include "pipe_wrap.h"
35 #include <stdlib.h> // abort()
36 #include <string.h> // memcpy()
37 #include <limits.h> // INT_MAX
44 using v8::EscapableHandleScope;
45 using v8::FunctionCallbackInfo;
47 using v8::HandleScope;
52 using v8::PropertyCallbackInfo;
59 StreamWrap::StreamWrap(Environment* env,
62 AsyncWrap::ProviderType provider)
63 : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream), provider),
65 default_callbacks_(this),
66 callbacks_(&default_callbacks_) {
70 void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
72 Environment* env = Environment::GetCurrent(args.GetIsolate());
73 HandleScope scope(env->isolate());
74 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
76 if (wrap != NULL && wrap->stream() != NULL) {
77 fd = wrap->stream()->io_watcher.fd;
79 args.GetReturnValue().Set(fd);
84 void StreamWrap::UpdateWriteQueueSize() {
85 HandleScope scope(env()->isolate());
86 Local<Integer> write_queue_size =
87 Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
88 object()->Set(env()->write_queue_size_string(), write_queue_size);
91 void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
92 Environment* env = Environment::GetCurrent(args.GetIsolate());
93 HandleScope scope(env->isolate());
95 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
97 int err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
99 args.GetReturnValue().Set(err);
103 void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
104 Environment* env = Environment::GetCurrent(args.GetIsolate());
105 HandleScope scope(env->isolate());
107 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
109 int err = uv_read_stop(wrap->stream());
110 args.GetReturnValue().Set(err);
114 void StreamWrap::OnAlloc(uv_handle_t* handle,
115 size_t suggested_size,
117 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
118 assert(wrap->stream() == reinterpret_cast<uv_stream_t*>(handle));
119 wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
123 template <class WrapType, class UVType>
124 static Local<Object> AcceptHandle(Environment* env, uv_stream_t* pipe) {
125 EscapableHandleScope scope(env->isolate());
126 Local<Object> wrap_obj;
129 wrap_obj = WrapType::Instantiate(env);
130 if (wrap_obj.IsEmpty())
131 return Local<Object>();
133 WrapType* wrap = Unwrap<WrapType>(wrap_obj);
134 handle = wrap->UVHandle();
136 if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
139 return scope.Escape(wrap_obj);
143 void StreamWrap::OnReadCommon(uv_stream_t* handle,
146 uv_handle_type pending) {
147 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
149 // We should not be getting this callback if someone as already called
150 // uv_close() on the handle.
151 assert(wrap->persistent().IsEmpty() == false);
154 if (wrap->is_tcp()) {
155 NODE_COUNT_NET_BYTES_RECV(nread);
156 } else if (wrap->is_named_pipe()) {
157 NODE_COUNT_PIPE_BYTES_RECV(nread);
161 wrap->callbacks()->DoRead(handle, nread, buf, pending);
165 void StreamWrap::OnRead(uv_stream_t* handle,
167 const uv_buf_t* buf) {
168 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
169 uv_handle_type type = UV_UNKNOWN_HANDLE;
171 if (wrap->is_named_pipe_ipc() &&
172 uv_pipe_pending_count(reinterpret_cast<uv_pipe_t*>(handle)) > 0) {
173 type = uv_pipe_pending_type(reinterpret_cast<uv_pipe_t*>(handle));
176 OnReadCommon(handle, nread, buf, type);
180 size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
181 assert(Buffer::HasInstance(val));
183 // Simple non-writev case
184 buf->base = Buffer::Data(val);
185 buf->len = Buffer::Length(val);
191 void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
192 HandleScope handle_scope(args.GetIsolate());
193 Environment* env = Environment::GetCurrent(args.GetIsolate());
195 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
197 assert(args[0]->IsObject());
198 assert(Buffer::HasInstance(args[1]));
200 Local<Object> req_wrap_obj = args[0].As<Object>();
201 Local<Object> buf_obj = args[1].As<Object>();
203 size_t length = Buffer::Length(buf_obj);
208 WriteBuffer(buf_obj, &buf);
210 // Try writing immediately without allocation
211 uv_buf_t* bufs = &buf;
213 int err = wrap->callbacks()->TryWrite(&bufs, &count);
220 // Allocate, or write rest
221 storage = new char[sizeof(WriteWrap)];
222 req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
224 err = wrap->callbacks()->DoWrite(req_wrap,
228 StreamWrap::AfterWrite);
229 req_wrap->Dispatched();
230 req_wrap_obj->Set(env->async(), True(env->isolate()));
233 req_wrap->~WriteWrap();
238 const char* msg = wrap->callbacks()->Error();
240 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
241 req_wrap_obj->Set(env->bytes_string(),
242 Integer::NewFromUnsigned(env->isolate(), length));
243 args.GetReturnValue().Set(err);
247 template <enum encoding encoding>
248 void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
249 HandleScope handle_scope(args.GetIsolate());
250 Environment* env = Environment::GetCurrent(args.GetIsolate());
253 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
255 assert(args[0]->IsObject());
256 assert(args[1]->IsString());
258 Local<Object> req_wrap_obj = args[0].As<Object>();
259 Local<String> string = args[1].As<String>();
261 // Compute the size of the storage that the string will be flattened into.
262 // For UTF8 strings that are very long, go ahead and take the hit for
263 // computing their actual size, rather than tripling the storage.
265 if (encoding == UTF8 && string->Length() > 65535)
266 storage_size = StringBytes::Size(env->isolate(), string, encoding);
268 storage_size = StringBytes::StorageSize(env->isolate(), string, encoding);
270 if (storage_size > INT_MAX) {
271 args.GetReturnValue().Set(UV_ENOBUFS);
275 // Try writing immediately if write size isn't too big
279 char stack_storage[16384]; // 16kb
283 bool try_write = storage_size + 15 <= sizeof(stack_storage) &&
284 (!wrap->is_named_pipe_ipc() || !args[2]->IsObject());
286 data_size = StringBytes::Write(env->isolate(),
291 buf = uv_buf_init(stack_storage, data_size);
293 uv_buf_t* bufs = &buf;
295 err = wrap->callbacks()->TryWrite(&bufs, &count);
309 storage = new char[sizeof(WriteWrap) + storage_size + 15];
310 req_wrap = new(storage) WriteWrap(env, req_wrap_obj, wrap);
312 data = reinterpret_cast<char*>(ROUND_UP(
313 reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
317 memcpy(data, buf.base, buf.len);
321 data_size = StringBytes::Write(env->isolate(),
328 assert(data_size <= storage_size);
330 buf = uv_buf_init(data, data_size);
332 if (!wrap->is_named_pipe_ipc()) {
333 err = wrap->callbacks()->DoWrite(req_wrap,
337 StreamWrap::AfterWrite);
339 uv_handle_t* send_handle = NULL;
341 if (args[2]->IsObject()) {
342 Local<Object> send_handle_obj = args[2].As<Object>();
343 HandleWrap* wrap = Unwrap<HandleWrap>(send_handle_obj);
344 send_handle = wrap->GetHandle();
345 // Reference StreamWrap instance to prevent it from being garbage
346 // collected before `AfterWrite` is called.
347 assert(!req_wrap->persistent().IsEmpty());
348 req_wrap->object()->Set(env->handle_string(), send_handle_obj);
351 err = wrap->callbacks()->DoWrite(
355 reinterpret_cast<uv_stream_t*>(send_handle),
356 StreamWrap::AfterWrite);
359 req_wrap->Dispatched();
360 req_wrap->object()->Set(env->async(), True(env->isolate()));
363 req_wrap->~WriteWrap();
368 const char* msg = wrap->callbacks()->Error();
370 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
371 req_wrap_obj->Set(env->bytes_string(),
372 Integer::NewFromUnsigned(env->isolate(), data_size));
373 args.GetReturnValue().Set(err);
377 void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
378 HandleScope handle_scope(args.GetIsolate());
379 Environment* env = Environment::GetCurrent(args.GetIsolate());
381 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
383 assert(args[0]->IsObject());
384 assert(args[1]->IsArray());
386 Local<Object> req_wrap_obj = args[0].As<Object>();
387 Local<Array> chunks = args[1].As<Array>();
388 size_t count = chunks->Length() >> 1;
391 uv_buf_t* bufs = bufs_;
393 // Determine storage size first
394 size_t storage_size = 0;
395 for (size_t i = 0; i < count; i++) {
396 Handle<Value> chunk = chunks->Get(i * 2);
398 if (Buffer::HasInstance(chunk))
400 // Buffer chunk, no additional storage required
403 Handle<String> string = chunk->ToString();
404 enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
406 if (encoding == UTF8 && string->Length() > 65535)
407 chunk_size = StringBytes::Size(env->isolate(), string, encoding);
409 chunk_size = StringBytes::StorageSize(env->isolate(), string, encoding);
411 storage_size += chunk_size + 15;
414 if (storage_size > INT_MAX) {
415 args.GetReturnValue().Set(UV_ENOBUFS);
419 if (ARRAY_SIZE(bufs_) < count)
420 bufs = new uv_buf_t[count];
422 storage_size += sizeof(WriteWrap);
423 char* storage = new char[storage_size];
424 WriteWrap* req_wrap =
425 new(storage) WriteWrap(env, req_wrap_obj, wrap);
428 size_t offset = sizeof(WriteWrap);
429 for (size_t i = 0; i < count; i++) {
430 Handle<Value> chunk = chunks->Get(i * 2);
433 if (Buffer::HasInstance(chunk)) {
434 bufs[i].base = Buffer::Data(chunk);
435 bufs[i].len = Buffer::Length(chunk);
436 bytes += bufs[i].len;
441 offset = ROUND_UP(offset, 16);
442 assert(offset < storage_size);
443 char* str_storage = storage + offset;
444 size_t str_size = storage_size - offset;
446 Handle<String> string = chunk->ToString();
447 enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
448 str_size = StringBytes::Write(env->isolate(),
453 bufs[i].base = str_storage;
454 bufs[i].len = str_size;
459 int err = wrap->callbacks()->DoWrite(req_wrap,
463 StreamWrap::AfterWrite);
469 req_wrap->Dispatched();
470 req_wrap->object()->Set(env->async(), True(env->isolate()));
471 req_wrap->object()->Set(env->bytes_string(),
472 Number::New(env->isolate(), bytes));
473 const char* msg = wrap->callbacks()->Error();
475 req_wrap_obj->Set(env->error_string(), OneByteString(env->isolate(), msg));
478 req_wrap->~WriteWrap();
482 args.GetReturnValue().Set(err);
486 void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
487 WriteStringImpl<ASCII>(args);
491 void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
492 WriteStringImpl<UTF8>(args);
496 void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
497 WriteStringImpl<UCS2>(args);
500 void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
501 Environment* env = Environment::GetCurrent(args.GetIsolate());
502 HandleScope scope(env->isolate());
504 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
506 assert(args.Length() > 0);
507 int err = uv_stream_set_blocking(wrap->stream(), args[0]->IsTrue());
509 args.GetReturnValue().Set(err);
512 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
513 WriteWrap* req_wrap = CONTAINER_OF(req, WriteWrap, req_);
514 StreamWrap* wrap = req_wrap->wrap();
515 Environment* env = wrap->env();
517 HandleScope handle_scope(env->isolate());
518 Context::Scope context_scope(env->context());
520 // The wrap and request objects should still be there.
521 assert(req_wrap->persistent().IsEmpty() == false);
522 assert(wrap->persistent().IsEmpty() == false);
524 // Unref handle property
525 Local<Object> req_wrap_obj = req_wrap->object();
526 req_wrap_obj->Delete(env->handle_string());
527 wrap->callbacks()->AfterWrite(req_wrap);
529 Local<Value> argv[] = {
530 Integer::New(env->isolate(), status),
533 Undefined(env->isolate())
536 const char* msg = wrap->callbacks()->Error();
538 argv[3] = OneByteString(env->isolate(), msg);
540 req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
542 req_wrap->~WriteWrap();
543 delete[] reinterpret_cast<char*>(req_wrap);
547 void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
548 HandleScope handle_scope(args.GetIsolate());
549 Environment* env = Environment::GetCurrent(args.GetIsolate());
551 StreamWrap* wrap = Unwrap<StreamWrap>(args.This());
553 assert(args[0]->IsObject());
554 Local<Object> req_wrap_obj = args[0].As<Object>();
556 ShutdownWrap* req_wrap = new ShutdownWrap(env,
558 AsyncWrap::PROVIDER_SHUTDOWNWRAP);
559 int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
560 req_wrap->Dispatched();
563 args.GetReturnValue().Set(err);
567 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
568 ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
569 StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
570 Environment* env = wrap->env();
572 // The wrap and request objects should still be there.
573 assert(req_wrap->persistent().IsEmpty() == false);
574 assert(wrap->persistent().IsEmpty() == false);
576 HandleScope handle_scope(env->isolate());
577 Context::Scope context_scope(env->context());
579 Local<Object> req_wrap_obj = req_wrap->object();
580 Local<Value> argv[3] = {
581 Integer::New(env->isolate(), status),
586 req_wrap->MakeCallback(env->oncomplete_string(), ARRAY_SIZE(argv), argv);
592 const char* StreamWrapCallbacks::Error() {
597 // NOTE: Call to this function could change both `buf`'s and `count`'s
598 // values, shifting their base and decrementing their length. This is
599 // required in order to skip the data that was successfully written via
601 int StreamWrapCallbacks::TryWrite(uv_buf_t** bufs, size_t* count) {
604 uv_buf_t* vbufs = *bufs;
605 size_t vcount = *count;
607 err = uv_try_write(wrap()->stream(), vbufs, vcount);
608 if (err == UV_ENOSYS)
613 // Slice off the buffers: skip all written buffers and slice the one that
614 // was partially written.
616 for (; written != 0 && vcount > 0; vbufs++, vcount--) {
618 if (vbufs[0].len > written) {
619 vbufs[0].base += written;
620 vbufs[0].len -= written;
626 written -= vbufs[0].len;
637 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
640 uv_stream_t* send_handle,
643 if (send_handle == NULL) {
644 r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
646 r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
651 for (size_t i = 0; i < count; i++)
652 bytes += bufs[i].len;
653 if (wrap()->stream()->type == UV_TCP) {
654 NODE_COUNT_NET_BYTES_SENT(bytes);
655 } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
656 NODE_COUNT_PIPE_BYTES_SENT(bytes);
660 wrap()->UpdateWriteQueueSize();
666 void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
667 wrap()->UpdateWriteQueueSize();
671 void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
672 size_t suggested_size,
674 buf->base = static_cast<char*>(malloc(suggested_size));
675 buf->len = suggested_size;
677 if (buf->base == NULL && suggested_size > 0) {
679 "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
685 void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
688 uv_handle_type pending) {
689 Environment* env = wrap()->env();
690 HandleScope handle_scope(env->isolate());
691 Context::Scope context_scope(env->context());
693 Local<Value> argv[] = {
694 Integer::New(env->isolate(), nread),
695 Undefined(env->isolate()),
696 Undefined(env->isolate())
700 if (buf->base != NULL)
702 wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
707 if (buf->base != NULL)
712 char* base = static_cast<char*>(realloc(buf->base, nread));
713 assert(static_cast<size_t>(nread) <= buf->len);
714 argv[1] = Buffer::Use(env, base, nread);
716 Local<Object> pending_obj;
717 if (pending == UV_TCP) {
718 pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle);
719 } else if (pending == UV_NAMED_PIPE) {
720 pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle);
721 } else if (pending == UV_UDP) {
722 pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle);
724 assert(pending == UV_UNKNOWN_HANDLE);
727 if (!pending_obj.IsEmpty()) {
728 argv[2] = pending_obj;
731 wrap()->MakeCallback(env->onread_string(), ARRAY_SIZE(argv), argv);
735 int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
736 return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);