1 // Copyright Joyent, Inc. and other Node contributors.
3 // Permission is hereby granted, free of charge, to any person obtaining a
4 // copy of this software and associated documentation files (the
5 // "Software"), to deal in the Software without restriction, including
6 // without limitation the rights to use, copy, modify, merge, publish,
7 // distribute, sublicense, and/or sell copies of the Software, and to permit
8 // persons to whom the Software is furnished to do so, subject to the
9 // following conditions:
11 // The above copyright notice and this permission notice shall be included
12 // in all copies or substantial portions of the Software.
14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17 // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18 // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19 // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20 // USE OR OTHER DEALINGS IN THE SOFTWARE.
22 #include "stream_wrap.h"
25 #include "handle_wrap.h"
26 #include "node_buffer.h"
27 #include "node_counters.h"
28 #include "pipe_wrap.h"
33 #include <stdlib.h> // abort()
34 #include <limits.h> // INT_MAX
41 using v8::FunctionCallbackInfo;
43 using v8::HandleScope;
48 using v8::PropertyCallbackInfo;
54 StreamWrap::StreamWrap(Environment* env,
57 : HandleWrap(env, object, reinterpret_cast<uv_handle_t*>(stream))
59 , default_callbacks_(this)
60 , callbacks_(&default_callbacks_) {
64 void StreamWrap::GetFD(Local<String>, const PropertyCallbackInfo<Value>& args) {
66 HandleScope scope(node_isolate);
68 NODE_UNWRAP_NO_ABORT(args.This(), StreamWrap, wrap);
70 if (wrap != NULL && wrap->stream() != NULL) {
71 fd = wrap->stream()->io_watcher.fd;
73 args.GetReturnValue().Set(fd);
78 void StreamWrap::UpdateWriteQueueSize() {
79 HandleScope scope(node_isolate);
80 Local<Integer> write_queue_size =
81 Integer::NewFromUnsigned(stream()->write_queue_size, node_isolate);
82 object()->Set(env()->write_queue_size_string(), write_queue_size);
86 void StreamWrap::ReadStart(const FunctionCallbackInfo<Value>& args) {
87 HandleScope scope(node_isolate);
90 NODE_UNWRAP(args.This(), StreamWrap, wrap);
93 if (wrap->is_named_pipe_ipc()) {
94 err = uv_read2_start(wrap->stream(), OnAlloc, OnRead2);
96 err = uv_read_start(wrap->stream(), OnAlloc, OnRead);
99 args.GetReturnValue().Set(err);
103 void StreamWrap::ReadStop(const FunctionCallbackInfo<Value>& args) {
104 HandleScope scope(node_isolate);
107 NODE_UNWRAP(args.This(), StreamWrap, wrap);
109 int err = uv_read_stop(wrap->stream());
110 args.GetReturnValue().Set(err);
114 void StreamWrap::OnAlloc(uv_handle_t* handle,
115 size_t suggested_size,
117 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
118 assert(wrap->stream() == reinterpret_cast<uv_stream_t*>(handle));
119 wrap->callbacks()->DoAlloc(handle, suggested_size, buf);
123 template <class WrapType, class UVType>
124 static Local<Object> AcceptHandle(Environment* env, uv_stream_t* pipe) {
125 HandleScope scope(node_isolate);
126 Local<Object> wrap_obj;
129 wrap_obj = WrapType::Instantiate(env);
130 if (wrap_obj.IsEmpty())
131 return Local<Object>();
134 NODE_UNWRAP(wrap_obj, WrapType, wrap);
135 handle = wrap->UVHandle();
137 if (uv_accept(pipe, reinterpret_cast<uv_stream_t*>(handle)))
140 return scope.Close(wrap_obj);
144 void StreamWrap::OnReadCommon(uv_stream_t* handle,
147 uv_handle_type pending) {
148 HandleScope scope(node_isolate);
150 StreamWrap* wrap = static_cast<StreamWrap*>(handle->data);
152 // We should not be getting this callback if someone as already called
153 // uv_close() on the handle.
154 assert(wrap->persistent().IsEmpty() == false);
157 if (wrap->is_tcp()) {
158 NODE_COUNT_NET_BYTES_RECV(nread);
159 } else if (wrap->is_named_pipe()) {
160 NODE_COUNT_PIPE_BYTES_RECV(nread);
164 wrap->callbacks()->DoRead(handle, nread, buf, pending);
168 void StreamWrap::OnRead(uv_stream_t* handle,
170 const uv_buf_t* buf) {
171 OnReadCommon(handle, nread, buf, UV_UNKNOWN_HANDLE);
175 void StreamWrap::OnRead2(uv_pipe_t* handle,
178 uv_handle_type pending) {
179 OnReadCommon(reinterpret_cast<uv_stream_t*>(handle), nread, buf, pending);
183 size_t StreamWrap::WriteBuffer(Handle<Value> val, uv_buf_t* buf) {
184 assert(Buffer::HasInstance(val));
186 // Simple non-writev case
187 buf->base = Buffer::Data(val);
188 buf->len = Buffer::Length(val);
194 void StreamWrap::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
195 Environment* env = Environment::GetCurrent(args.GetIsolate());
196 HandleScope handle_scope(args.GetIsolate());
199 NODE_UNWRAP(args.This(), StreamWrap, wrap);
201 assert(args[0]->IsObject());
202 assert(Buffer::HasInstance(args[1]));
204 Local<Object> req_wrap_obj = args[0].As<Object>();
205 Local<Object> buf_obj = args[1].As<Object>();
207 size_t length = Buffer::Length(buf_obj);
208 char* storage = new char[sizeof(WriteWrap)];
209 WriteWrap* req_wrap =
210 new(storage) WriteWrap(env, req_wrap_obj, wrap);
213 WriteBuffer(buf_obj, &buf);
215 int err = wrap->callbacks()->DoWrite(req_wrap,
219 StreamWrap::AfterWrite);
220 req_wrap->Dispatched();
221 req_wrap_obj->Set(env->bytes_string(),
222 Integer::NewFromUnsigned(length, node_isolate));
225 req_wrap->~WriteWrap();
229 args.GetReturnValue().Set(err);
233 template <enum encoding encoding>
234 void StreamWrap::WriteStringImpl(const FunctionCallbackInfo<Value>& args) {
235 Environment* env = Environment::GetCurrent(args.GetIsolate());
236 HandleScope handle_scope(args.GetIsolate());
240 NODE_UNWRAP(args.This(), StreamWrap, wrap);
242 assert(args[0]->IsObject());
243 assert(args[1]->IsString());
245 Local<Object> req_wrap_obj = args[0].As<Object>();
246 Local<String> string = args[1].As<String>();
248 // Compute the size of the storage that the string will be flattened into.
249 // For UTF8 strings that are very long, go ahead and take the hit for
250 // computing their actual size, rather than tripling the storage.
252 if (encoding == UTF8 && string->Length() > 65535)
253 storage_size = StringBytes::Size(string, encoding);
255 storage_size = StringBytes::StorageSize(string, encoding);
257 if (storage_size > INT_MAX) {
258 args.GetReturnValue().Set(UV_ENOBUFS);
262 char* storage = new char[sizeof(WriteWrap) + storage_size + 15];
263 WriteWrap* req_wrap =
264 new(storage) WriteWrap(env, req_wrap_obj, wrap);
266 char* data = reinterpret_cast<char*>(ROUND_UP(
267 reinterpret_cast<uintptr_t>(storage) + sizeof(WriteWrap), 16));
270 data_size = StringBytes::Write(data, storage_size, string, encoding);
272 assert(data_size <= storage_size);
279 if (!wrap->is_named_pipe_ipc()) {
280 err = wrap->callbacks()->DoWrite(req_wrap,
284 StreamWrap::AfterWrite);
286 uv_handle_t* send_handle = NULL;
288 if (args[2]->IsObject()) {
289 Local<Object> send_handle_obj = args[2].As<Object>();
291 NODE_UNWRAP(send_handle_obj, HandleWrap, wrap);
292 send_handle = wrap->GetHandle();
293 // Reference StreamWrap instance to prevent it from being garbage
294 // collected before `AfterWrite` is called.
295 assert(!req_wrap->persistent().IsEmpty());
296 req_wrap->object()->Set(env->handle_string(), send_handle_obj);
299 err = wrap->callbacks()->DoWrite(
303 reinterpret_cast<uv_stream_t*>(send_handle),
304 StreamWrap::AfterWrite);
307 req_wrap->Dispatched();
308 req_wrap->object()->Set(env->bytes_string(),
309 Number::New(node_isolate, data_size));
312 req_wrap->~WriteWrap();
316 args.GetReturnValue().Set(err);
320 void StreamWrap::Writev(const FunctionCallbackInfo<Value>& args) {
321 Environment* env = Environment::GetCurrent(args.GetIsolate());
322 HandleScope handle_scope(args.GetIsolate());
325 NODE_UNWRAP(args.This(), StreamWrap, wrap);
327 assert(args[0]->IsObject());
328 assert(args[1]->IsArray());
330 Local<Object> req_wrap_obj = args[0].As<Object>();
331 Local<Array> chunks = args[1].As<Array>();
332 size_t count = chunks->Length() >> 1;
335 uv_buf_t* bufs = bufs_;
337 // Determine storage size first
338 size_t storage_size = 0;
339 for (size_t i = 0; i < count; i++) {
340 Handle<Value> chunk = chunks->Get(i * 2);
342 if (Buffer::HasInstance(chunk))
344 // Buffer chunk, no additional storage required
347 Handle<String> string = chunk->ToString();
348 enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
350 if (encoding == UTF8 && string->Length() > 65535)
351 chunk_size = StringBytes::Size(string, encoding);
353 chunk_size = StringBytes::StorageSize(string, encoding);
355 storage_size += chunk_size + 15;
358 if (storage_size > INT_MAX) {
359 args.GetReturnValue().Set(UV_ENOBUFS);
363 if (ARRAY_SIZE(bufs_) < count)
364 bufs = new uv_buf_t[count];
366 storage_size += sizeof(WriteWrap);
367 char* storage = new char[storage_size];
368 WriteWrap* req_wrap =
369 new(storage) WriteWrap(env, req_wrap_obj, wrap);
372 size_t offset = sizeof(WriteWrap);
373 for (size_t i = 0; i < count; i++) {
374 Handle<Value> chunk = chunks->Get(i * 2);
377 if (Buffer::HasInstance(chunk)) {
378 bufs[i].base = Buffer::Data(chunk);
379 bufs[i].len = Buffer::Length(chunk);
380 bytes += bufs[i].len;
385 offset = ROUND_UP(offset, 16);
386 assert(offset < storage_size);
387 char* str_storage = storage + offset;
388 size_t str_size = storage_size - offset;
390 Handle<String> string = chunk->ToString();
391 enum encoding encoding = ParseEncoding(chunks->Get(i * 2 + 1));
392 str_size = StringBytes::Write(str_storage, str_size, string, encoding);
393 bufs[i].base = str_storage;
394 bufs[i].len = str_size;
399 int err = wrap->callbacks()->DoWrite(req_wrap,
403 StreamWrap::AfterWrite);
409 req_wrap->Dispatched();
410 req_wrap->object()->Set(env->bytes_string(),
411 Number::New(node_isolate, bytes));
414 req_wrap->~WriteWrap();
418 args.GetReturnValue().Set(err);
422 void StreamWrap::WriteAsciiString(const FunctionCallbackInfo<Value>& args) {
423 WriteStringImpl<ASCII>(args);
427 void StreamWrap::WriteUtf8String(const FunctionCallbackInfo<Value>& args) {
428 WriteStringImpl<UTF8>(args);
432 void StreamWrap::WriteUcs2String(const FunctionCallbackInfo<Value>& args) {
433 WriteStringImpl<UCS2>(args);
437 void StreamWrap::AfterWrite(uv_write_t* req, int status) {
438 WriteWrap* req_wrap = container_of(req, WriteWrap, req_);
439 StreamWrap* wrap = req_wrap->wrap();
440 Environment* env = wrap->env();
442 Context::Scope context_scope(env->context());
443 HandleScope handle_scope(env->isolate());
445 // The wrap and request objects should still be there.
446 assert(req_wrap->persistent().IsEmpty() == false);
447 assert(wrap->persistent().IsEmpty() == false);
449 // Unref handle property
450 Local<Object> req_wrap_obj = req_wrap->object();
451 req_wrap_obj->Delete(env->handle_string());
452 wrap->callbacks_->AfterWrite(req_wrap);
454 Local<Value> argv[] = {
455 Integer::New(status, node_isolate),
462 env->oncomplete_string(),
466 req_wrap->~WriteWrap();
467 delete[] reinterpret_cast<char*>(req_wrap);
471 void StreamWrap::Shutdown(const FunctionCallbackInfo<Value>& args) {
472 Environment* env = Environment::GetCurrent(args.GetIsolate());
473 HandleScope handle_scope(args.GetIsolate());
476 NODE_UNWRAP(args.This(), StreamWrap, wrap);
478 assert(args[0]->IsObject());
479 Local<Object> req_wrap_obj = args[0].As<Object>();
481 ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj);
482 int err = wrap->callbacks()->DoShutdown(req_wrap, AfterShutdown);
483 req_wrap->Dispatched();
484 if (err) delete req_wrap;
485 args.GetReturnValue().Set(err);
489 void StreamWrap::AfterShutdown(uv_shutdown_t* req, int status) {
490 ShutdownWrap* req_wrap = static_cast<ShutdownWrap*>(req->data);
491 StreamWrap* wrap = static_cast<StreamWrap*>(req->handle->data);
492 Environment* env = wrap->env();
494 // The wrap and request objects should still be there.
495 assert(req_wrap->persistent().IsEmpty() == false);
496 assert(wrap->persistent().IsEmpty() == false);
498 Context::Scope context_scope(env->context());
499 HandleScope handle_scope(env->isolate());
501 Local<Object> req_wrap_obj = req_wrap->object();
502 Local<Value> argv[3] = {
503 Integer::New(status, node_isolate),
510 env->oncomplete_string(),
518 int StreamWrapCallbacks::DoWrite(WriteWrap* w,
521 uv_stream_t* send_handle,
524 if (send_handle == NULL) {
525 r = uv_write(&w->req_, wrap()->stream(), bufs, count, cb);
527 r = uv_write2(&w->req_, wrap()->stream(), bufs, count, send_handle, cb);
532 for (size_t i = 0; i < count; i++)
533 bytes += bufs[i].len;
534 if (wrap()->stream()->type == UV_TCP) {
535 NODE_COUNT_NET_BYTES_SENT(bytes);
536 } else if (wrap()->stream()->type == UV_NAMED_PIPE) {
537 NODE_COUNT_PIPE_BYTES_SENT(bytes);
541 wrap()->UpdateWriteQueueSize();
547 void StreamWrapCallbacks::AfterWrite(WriteWrap* w) {
548 wrap()->UpdateWriteQueueSize();
552 void StreamWrapCallbacks::DoAlloc(uv_handle_t* handle,
553 size_t suggested_size,
555 buf->base = static_cast<char*>(malloc(suggested_size));
556 buf->len = suggested_size;
558 if (buf->base == NULL && suggested_size > 0) {
560 "node::StreamWrapCallbacks::DoAlloc(uv_handle_t*, size_t, uv_buf_t*)",
566 void StreamWrapCallbacks::DoRead(uv_stream_t* handle,
569 uv_handle_type pending) {
570 Environment* env = wrap()->env();
571 Context::Scope context_scope(env->context());
572 HandleScope handle_scope(env->isolate());
574 Local<Value> argv[] = {
575 Integer::New(nread, node_isolate),
581 if (buf->base != NULL)
583 MakeCallback(env, Self(), env->onread_string(), ARRAY_SIZE(argv), argv);
588 if (buf->base != NULL)
593 char* base = static_cast<char*>(realloc(buf->base, nread));
594 assert(static_cast<size_t>(nread) <= buf->len);
595 argv[1] = Buffer::Use(env, base, nread);
597 Local<Object> pending_obj;
598 if (pending == UV_TCP) {
599 pending_obj = AcceptHandle<TCPWrap, uv_tcp_t>(env, handle);
600 } else if (pending == UV_NAMED_PIPE) {
601 pending_obj = AcceptHandle<PipeWrap, uv_pipe_t>(env, handle);
602 } else if (pending == UV_UDP) {
603 pending_obj = AcceptHandle<UDPWrap, uv_udp_t>(env, handle);
605 assert(pending == UV_UNKNOWN_HANDLE);
608 if (!pending_obj.IsEmpty()) {
609 argv[2] = pending_obj;
614 env->onread_string(),
620 int StreamWrapCallbacks::DoShutdown(ShutdownWrap* req_wrap, uv_shutdown_cb cb) {
621 return uv_shutdown(&req_wrap->req_, wrap()->stream(), cb);
625 Handle<Object> StreamWrapCallbacks::Self() {
626 return wrap()->object();