streams: introduce StreamWrap and JSStream
authorFedor Indutny <fedor@indutny.com>
Mon, 23 Feb 2015 20:09:44 +0000 (23:09 +0300)
committerFedor Indutny <fedor@indutny.com>
Tue, 24 Feb 2015 19:38:21 +0000 (22:38 +0300)
Introduce a way to wrap plain-js `stream.Duplex` streams into C++
StreamBase's child class. With such method at hand it is now possible to
pass `stream.Duplex` instance as a `socket` parameter to
`tls.connect()`.

PR-URL: https://github.com/iojs/io.js/pull/926
Reviewed-By: Chris Dickinson <christopher.s.dickinson@gmail.com>
15 files changed:
lib/_stream_wrap.js [new file with mode: 0644]
lib/_tls_wrap.js
node.gyp
src/async-wrap.h
src/env.h
src/js_stream.cc
src/js_stream.h
src/node_wrap.h
src/stream_base.cc
src/stream_base.h
src/stream_wrap.cc
src/stream_wrap.h
src/tls_wrap.cc
src/tls_wrap.h
test/parallel/test-tls-js-stream.js [new file with mode: 0644]

diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js
new file mode 100644 (file)
index 0000000..c3dcfe5
--- /dev/null
@@ -0,0 +1,118 @@
+const util = require('util');
+const Socket = require('net').Socket;
+const JSStream = process.binding('js_stream').JSStream;
+const uv = process.binding('uv');
+
+function StreamWrap(stream) {
+  var handle = new JSStream();
+
+  this.stream = stream;
+
+  var self = this;
+  handle.close = function(cb) {
+    cb();
+  };
+  handle.isAlive = function() {
+    return self.isAlive();
+  };
+  handle.isClosing = function() {
+    return self.isClosing();
+  };
+  handle.onreadstart = function() {
+    return self.readStart();
+  };
+  handle.onreadstop = function() {
+    return self.readStop();
+  };
+  handle.onshutdown = function(req) {
+    return self.shutdown(req);
+  };
+  handle.onwrite = function(req, bufs) {
+    return self.write(req, bufs);
+  };
+
+  this.stream.pause();
+  this.stream.on('data', function(chunk) {
+    self._handle.readBuffer(chunk);
+  });
+  this.stream.once('end', function() {
+    self._handle.emitEOF();
+  });
+  this.stream.on('error', function(err) {
+    self.emit('error', err);
+  });
+
+  Socket.call(this, {
+    handle: handle
+  });
+}
+util.inherits(StreamWrap, Socket);
+module.exports = StreamWrap;
+
+// require('_stream_wrap').StreamWrap
+StreamWrap.StreamWrap = StreamWrap;
+
+StreamWrap.prototype.isAlive = function isAlive() {
+  return this.readable && this.writable;
+};
+
+StreamWrap.prototype.isClosing = function isClosing() {
+  return !this.isAlive();
+};
+
+StreamWrap.prototype.readStart = function readStart() {
+  this.stream.resume();
+  return 0;
+};
+
+StreamWrap.prototype.readStop = function readStop() {
+  this.stream.pause();
+  return 0;
+};
+
+StreamWrap.prototype.shutdown = function shutdown(req) {
+  var self = this;
+
+  this.stream.end(function() {
+    // Ensure that write was dispatched
+    setImmediate(function() {
+      self._handle.finishShutdown(req, 0);
+    });
+  });
+  return 0;
+};
+
+StreamWrap.prototype.write = function write(req, bufs) {
+  var pending = bufs.length;
+  var self = this;
+
+  self.stream.cork();
+  bufs.forEach(function(buf) {
+    self.stream.write(buf, done);
+  });
+  self.stream.uncork();
+
+  function done(err) {
+    if (!err && --pending !== 0)
+      return;
+
+    // Ensure that this is called once in case of error
+    pending = 0;
+
+    // Ensure that write was dispatched
+    setImmediate(function() {
+      var errCode = 0;
+      if (err) {
+        if (err.code && uv['UV_' + err.code])
+          errCode = uv['UV_' + err.code];
+        else
+          errCode = uv.UV_EPIPE;
+      }
+
+      self._handle.doAfterWrite(req);
+      self._handle.finishWrite(req, errCode);
+    });
+  }
+
+  return 0;
+};
index 10221b9..41421e1 100644 (file)
@@ -7,6 +7,8 @@ const tls = require('tls');
 const util = require('util');
 const listenerCount = require('events').listenerCount;
 const common = require('_tls_common');
+const StreamWrap = require('_stream_wrap').StreamWrap;
+const Duplex = require('stream').Duplex;
 const debug = util.debuglog('tls');
 const Timer = process.binding('timer_wrap').Timer;
 const tls_wrap = process.binding('tls_wrap');
@@ -224,6 +226,10 @@ function TLSSocket(socket, options) {
   this.authorized = false;
   this.authorizationError = null;
 
+  // Wrap plain JS Stream into StreamWrap
+  if (!(socket instanceof net.Socket) && socket instanceof Duplex)
+    socket = new StreamWrap(socket);
+
   // Just a documented property to make secure sockets
   // distinguishable from regular ones.
   this.encrypted = true;
@@ -280,7 +286,8 @@ TLSSocket.prototype._wrapHandle = function(handle) {
   // Proxy HandleWrap, PipeWrap and TCPWrap methods
   proxiedMethods.forEach(function(name) {
     res[name] = function methodProxy() {
-      return handle[name].apply(handle, arguments);
+      if (handle[name])
+        return handle[name].apply(handle, arguments);
     };
   });
 
@@ -373,7 +380,7 @@ TLSSocket.prototype._init = function(socket) {
     this.setTimeout(options.handshakeTimeout, this._handleTimeout);
 
   // Socket already has some buffered data - emulate receiving it
-  if (socket && socket._readableState.length) {
+  if (socket && socket._readableState && socket._readableState.length) {
     var buf;
     while ((buf = socket.read()) !== null)
       ssl.receive(buf);
@@ -388,6 +395,10 @@ TLSSocket.prototype._init = function(socket) {
       self._connecting = false;
       self.emit('connect');
     });
+
+    socket.on('error', function(err) {
+      self._tlsError(err);
+    });
   }
 
   // Assume `tls.connect()`
index 996121e..4af27d8 100644 (file)
--- a/node.gyp
+++ b/node.gyp
@@ -56,6 +56,7 @@
       'lib/_stream_duplex.js',
       'lib/_stream_transform.js',
       'lib/_stream_passthrough.js',
+      'lib/_stream_wrap.js',
       'lib/string_decoder.js',
       'lib/sys.js',
       'lib/timers.js',
@@ -95,6 +96,7 @@
         'src/fs_event_wrap.cc',
         'src/cares_wrap.cc',
         'src/handle_wrap.cc',
+        'src/js_stream.cc',
         'src/node.cc',
         'src/node_buffer.cc',
         'src/node_constants.cc',
         'src/env.h',
         'src/env-inl.h',
         'src/handle_wrap.h',
+        'src/js_stream.h',
         'src/node.h',
         'src/node_buffer.h',
         'src/node_constants.h',
index 86748a5..5e898fe 100644 (file)
@@ -17,6 +17,7 @@ namespace node {
   V(FSREQWRAP)                                                                \
   V(GETADDRINFOREQWRAP)                                                       \
   V(GETNAMEINFOREQWRAP)                                                       \
+  V(JSSTREAM)                                                                 \
   V(PIPEWRAP)                                                                 \
   V(PROCESSWRAP)                                                              \
   V(QUERYWRAP)                                                                \
index c9b4cc0..18fed18 100644 (file)
--- a/src/env.h
+++ b/src/env.h
@@ -107,6 +107,8 @@ namespace node {
   V(ipv4_string, "IPv4")                                                      \
   V(ipv6_lc_string, "ipv6")                                                   \
   V(ipv6_string, "IPv6")                                                      \
+  V(isalive_string, "isAlive")                                                \
+  V(isclosing_string, "isClosing")                                            \
   V(issuer_string, "issuer")                                                  \
   V(issuercert_string, "issuerCertificate")                                   \
   V(kill_signal_string, "killSignal")                                         \
@@ -141,9 +143,13 @@ namespace node {
   V(onnewsessiondone_string, "onnewsessiondone")                              \
   V(onocspresponse_string, "onocspresponse")                                  \
   V(onread_string, "onread")                                                  \
+  V(onreadstart_string, "onreadstart")                                        \
+  V(onreadstop_string, "onreadstop")                                          \
   V(onselect_string, "onselect")                                              \
+  V(onshutdown_string, "onshutdown")                                          \
   V(onsignal_string, "onsignal")                                              \
   V(onstop_string, "onstop")                                                  \
+  V(onwrite_string, "onwrite")                                                \
   V(output_string, "output")                                                  \
   V(order_string, "order")                                                    \
   V(owner_string, "owner")                                                    \
@@ -225,6 +231,7 @@ namespace node {
   V(context, v8::Context)                                                     \
   V(domain_array, v8::Array)                                                  \
   V(fs_stats_constructor_function, v8::Function)                              \
+  V(jsstream_constructor_template, v8::FunctionTemplate)                      \
   V(module_load_list_array, v8::Array)                                        \
   V(pipe_constructor_template, v8::FunctionTemplate)                          \
   V(process_object, v8::Object)                                               \
index 3cc3a89..38ab847 100644 (file)
 #include "async-wrap.h"
 #include "env.h"
 #include "env-inl.h"
+#include "node_buffer.h"
 #include "stream_base.h"
 #include "v8.h"
 
 namespace node {
 
+using v8::Array;
 using v8::Context;
+using v8::External;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
 using v8::Handle;
+using v8::HandleScope;
+using v8::Local;
 using v8::Object;
 using v8::Value;
 
+
+JSStream::JSStream(Environment* env, Handle<Object> obj, AsyncWrap* parent)
+    : StreamBase(env),
+      AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent) {
+  node::Wrap(obj, this);
+}
+
+
+JSStream::~JSStream() {
+}
+
+
+void* JSStream::Cast() {
+  return static_cast<void*>(this);
+}
+
+
+AsyncWrap* JSStream::GetAsyncWrap() {
+  return static_cast<AsyncWrap*>(this);
+}
+
+
+bool JSStream::IsAlive() {
+  return MakeCallback(env()->isalive_string(), 0, nullptr)->IsTrue();
+}
+
+
+bool JSStream::IsClosing() {
+  return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue();
+}
+
+
+int JSStream::ReadStart() {
+  return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value();
+}
+
+
+int JSStream::ReadStop() {
+  return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value();
+}
+
+
+int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
+  HandleScope scope(env()->isolate());
+
+  Local<Value> argv[] = {
+    req_wrap->object()
+  };
+
+  Local<Value> res =
+      MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);
+
+  return res->Int32Value();
+}
+
+
+int JSStream::DoWrite(WriteWrap* w,
+                      uv_buf_t* bufs,
+                      size_t count,
+                      uv_stream_t* send_handle) {
+  CHECK_EQ(send_handle, nullptr);
+
+  HandleScope scope(env()->isolate());
+
+  Local<Array> bufs_arr = Array::New(env()->isolate(), count);
+  for (size_t i = 0; i < count; i++)
+    bufs_arr->Set(i, Buffer::New(env(), bufs[0].base, bufs[0].len));
+
+  Local<Value> argv[] = {
+    w->object(),
+    bufs_arr
+  };
+
+  Local<Value> res =
+      MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);
+
+  return res->Int32Value();
+}
+
+
+void JSStream::New(const FunctionCallbackInfo<Value>& args) {
+  // This constructor should not be exposed to public javascript.
+  // Therefore we assert that we are not trying to call this as a
+  // normal function.
+  CHECK(args.IsConstructCall());
+  Environment* env = Environment::GetCurrent(args);
+  JSStream* wrap;
+
+  if (args.Length() == 0) {
+    wrap = new JSStream(env, args.This(), nullptr);
+  } else if (args[0]->IsExternal()) {
+    void* ptr = args[0].As<External>()->Value();
+    wrap = new JSStream(env, args.This(), static_cast<AsyncWrap*>(ptr));
+  } else {
+    UNREACHABLE();
+  }
+  CHECK(wrap);
+}
+
+
+static void FreeCallback(char* data, void* hint) {
+  // Intentional no-op
+}
+
+
+void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
+  JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+  uv_buf_t buf;
+  wrap->OnAlloc(args[0]->Int32Value(), &buf);
+  args.GetReturnValue().Set(Buffer::New(wrap->env(),
+                                        buf.base,
+                                        buf.len,
+                                        FreeCallback,
+                                        nullptr));
+}
+
+
+void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
+  JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+  CHECK(Buffer::HasInstance(args[1]));
+  uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
+  wrap->OnRead(args[0]->Int32Value(), &buf);
+}
+
+
+void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
+  JSStream* wrap = Unwrap<JSStream>(args.Holder());
+  WriteWrap* w = Unwrap<WriteWrap>(args[0].As<Object>());
+
+  wrap->OnAfterWrite(w);
+}
+
+
+template <class Wrap>
+void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
+  Wrap* w = Unwrap<Wrap>(args[0].As<Object>());
+
+  w->Done(args[0]->Int32Value());
+}
+
+
+void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
+  JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+  CHECK(Buffer::HasInstance(args[0]));
+  char* data = Buffer::Data(args[0]);
+  int len = Buffer::Length(args[0]);
+
+  do {
+    uv_buf_t buf;
+    ssize_t avail = len;
+    wrap->OnAlloc(len, &buf);
+    if (static_cast<ssize_t>(buf.len) < avail)
+      avail = buf.len;
+
+    memcpy(buf.base, data, avail);
+    data += avail;
+    len -= avail;
+    wrap->OnRead(avail, &buf);
+  } while (len != 0);
+}
+
+
+void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
+  JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+  wrap->OnRead(UV_EOF, nullptr);
+}
+
+
 void JSStream::Initialize(Handle<Object> target,
                           Handle<Value> unused,
                           Handle<Context> context) {
+  Environment* env = Environment::GetCurrent(context);
+
+  Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
+  t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"));
+  t->InstanceTemplate()->SetInternalFieldCount(1);
+
+  env->SetProtoMethod(t, "doAlloc", DoAlloc);
+  env->SetProtoMethod(t, "doRead", DoRead);
+  env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
+  env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
+  env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
+  env->SetProtoMethod(t, "readBuffer", ReadBuffer);
+  env->SetProtoMethod(t, "emitEOF", EmitEOF);
+
+  StreamBase::AddMethods<JSStream>(env, t);
+  target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"),
+              t->GetFunction());
+  env->set_jsstream_constructor_template(t);
 }
 
 }  // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize)
index 6a2d3bf..8e2ff13 100644 (file)
@@ -8,11 +8,39 @@
 
 namespace node {
 
-class JSStream : public StreamBase {
+class JSStream : public StreamBase, public AsyncWrap {
  public:
   static void Initialize(v8::Handle<v8::Object> target,
                          v8::Handle<v8::Value> unused,
                          v8::Handle<v8::Context> context);
+
+  void* Cast() override;
+  bool IsAlive() override;
+  bool IsClosing() override;
+  int ReadStart() override;
+  int ReadStop() override;
+
+  int DoShutdown(ShutdownWrap* req_wrap) override;
+  int DoWrite(WriteWrap* w,
+              uv_buf_t* bufs,
+              size_t count,
+              uv_stream_t* send_handle) override;
+
+ protected:
+  JSStream(Environment* env, v8::Handle<v8::Object> obj, AsyncWrap* parent);
+  ~JSStream();
+
+  AsyncWrap* GetAsyncWrap() override;
+
+  static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
+  static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+  template <class Wrap>
+  static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
 };
 
 }  // namespace node
index ddd7bd1..58b042a 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "env.h"
 #include "env-inl.h"
+#include "js_stream.h"
 #include "pipe_wrap.h"
 #include "tcp_wrap.h"
 #include "tty_wrap.h"
@@ -40,6 +41,10 @@ namespace node {
             env->tls_wrap_constructor_template()->HasInstance(obj)) {         \
           TLSWrap* const wrap = Unwrap<TLSWrap>(obj);                         \
           BODY                                                                \
+        } else if (env->jsstream_constructor_template().IsEmpty() == false && \
+                   env->jsstream_constructor_template()->HasInstance(obj)) {  \
+          JSStream* const wrap = Unwrap<JSStream>(obj);                       \
+          BODY                                                                \
         }                                                                     \
       });                                                                     \
     } while (0)
index 0a1324b..82b1d65 100644 (file)
@@ -5,6 +5,7 @@
 #include "node_buffer.h"
 #include "env.h"
 #include "env-inl.h"
+#include "js_stream.h"
 #include "string_bytes.h"
 #include "tls_wrap.h"
 #include "util.h"
@@ -34,6 +35,8 @@ template void StreamBase::AddMethods<StreamWrap>(Environment* env,
                                                  Handle<FunctionTemplate> t);
 template void StreamBase::AddMethods<TLSWrap>(Environment* env,
                                               Handle<FunctionTemplate> t);
+template void StreamBase::AddMethods<JSStream>(Environment* env,
+                                               Handle<FunctionTemplate> t);
 
 
 template <class Base>
@@ -488,8 +491,29 @@ void StreamBase::EmitData(ssize_t nread,
 }
 
 
-AsyncWrap* StreamBase::GetAsyncWrap() {
+bool StreamBase::IsIPCPipe() {
+  return false;
+}
+
+
+int StreamBase::GetFD() {
+  return -1;
+}
+
+
+int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
+  // No TryWrite by default
+  return 0;
+}
+
+
+const char* StreamResource::Error() const {
   return nullptr;
 }
 
+
+void StreamResource::ClearError() {
+  // No-op
+}
+
 }  // namespace node
index d6b3a55..87aae05 100644 (file)
@@ -106,13 +106,13 @@ class StreamResource {
   virtual ~StreamResource() = default;
 
   virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
-  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0;
+  virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
   virtual int DoWrite(WriteWrap* w,
                       uv_buf_t* bufs,
                       size_t count,
                       uv_stream_t* send_handle) = 0;
-  virtual const char* Error() const = 0;
-  virtual void ClearError() = 0;
+  virtual const char* Error() const;
+  virtual void ClearError();
 
   // Events
   inline void OnAfterWrite(WriteWrap* w) {
@@ -127,7 +127,7 @@ class StreamResource {
 
   inline void OnRead(size_t nread,
                      const uv_buf_t* buf,
-                     uv_handle_type pending) {
+                     uv_handle_type pending = UV_UNKNOWN_HANDLE) {
     if (read_cb_ != nullptr)
       read_cb_(nread, buf, pending, read_ctx_);
   }
@@ -163,10 +163,10 @@ class StreamBase : public StreamResource {
                          v8::Handle<v8::FunctionTemplate> target);
 
   virtual void* Cast() = 0;
-  virtual bool IsAlive() const = 0;
-  virtual bool IsClosing() const = 0;
-  virtual bool IsIPCPipe() const = 0;
-  virtual int GetFD() const = 0;
+  virtual bool IsAlive() = 0;
+  virtual bool IsClosing() = 0;
+  virtual bool IsIPCPipe();
+  virtual int GetFD();
 
   virtual int ReadStart() = 0;
   virtual int ReadStop() = 0;
index 3b50f63..c8ea8d2 100644 (file)
@@ -84,7 +84,7 @@ void StreamWrap::AddMethods(Environment* env,
 }
 
 
-int StreamWrap::GetFD() const {
+int StreamWrap::GetFD() {
   int fd = -1;
 #if !defined(_WIN32)
   if (stream() != nullptr)
@@ -94,12 +94,12 @@ int StreamWrap::GetFD() const {
 }
 
 
-bool StreamWrap::IsAlive() const {
+bool StreamWrap::IsAlive() {
   return HandleWrap::IsAlive(this);
 }
 
 
-bool StreamWrap::IsClosing() const {
+bool StreamWrap::IsClosing() {
   return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
 }
 
@@ -114,7 +114,7 @@ AsyncWrap* StreamWrap::GetAsyncWrap() {
 }
 
 
-bool StreamWrap::IsIPCPipe() const {
+bool StreamWrap::IsIPCPipe() {
   return is_named_pipe_ipc();
 }
 
@@ -359,16 +359,6 @@ void StreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) {
   wrap->UpdateWriteQueueSize();
 }
 
-
-const char* StreamWrap::Error() const {
-  return nullptr;
-}
-
-
-void StreamWrap::ClearError() {
-  // No-op
-}
-
 }  // namespace node
 
 NODE_MODULE_CONTEXT_AWARE_BUILTIN(stream_wrap, node::StreamWrap::Initialize)
index ca673b4..99561e8 100644 (file)
@@ -19,11 +19,11 @@ class StreamWrap : public HandleWrap, public StreamBase {
                          v8::Handle<v8::Value> unused,
                          v8::Handle<v8::Context> context);
 
-  int GetFD() const override;
+  int GetFD() override;
   void* Cast() override;
-  bool IsAlive() const override;
-  bool IsClosing() const override;
-  bool IsIPCPipe() const override;
+  bool IsAlive() override;
+  bool IsClosing() override;
+  bool IsIPCPipe() override;
 
   // JavaScript functions
   int ReadStart() override;
@@ -36,8 +36,6 @@ class StreamWrap : public HandleWrap, public StreamBase {
               uv_buf_t* bufs,
               size_t count,
               uv_stream_t* send_handle) override;
-  const char* Error() const override;
-  void ClearError() override;
 
   inline uv_stream_t* stream() const {
     return stream_;
index ab8db69..8605672 100644 (file)
@@ -216,7 +216,7 @@ void TLSWrap::Receive(const FunctionCallbackInfo<Value>& args) {
     size_t copy = buf.len > len ? len : buf.len;
     memcpy(buf.base, data, copy);
     buf.len = copy;
-    wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE);
+    wrap->stream_->OnRead(buf.len, &buf);
 
     data += copy;
     len -= copy;
@@ -414,7 +414,7 @@ void TLSWrap::ClearOut() {
       if (static_cast<int>(buf.len) < avail)
         avail = buf.len;
       memcpy(buf.base, out, avail);
-      OnRead(avail, &buf, UV_UNKNOWN_HANDLE);
+      OnRead(avail, &buf);
 
       read -= avail;
     }
@@ -423,7 +423,7 @@ void TLSWrap::ClearOut() {
   int flags = SSL_get_shutdown(ssl_);
   if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
     eof_ = true;
-    OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE);
+    OnRead(UV_EOF, nullptr);
   }
 
   if (read == -1) {
@@ -495,22 +495,22 @@ AsyncWrap* TLSWrap::GetAsyncWrap() {
 }
 
 
-bool TLSWrap::IsIPCPipe() const {
+bool TLSWrap::IsIPCPipe() {
   return stream_->IsIPCPipe();
 }
 
 
-int TLSWrap::GetFD() const {
+int TLSWrap::GetFD() {
   return stream_->GetFD();
 }
 
 
-bool TLSWrap::IsAlive() const {
+bool TLSWrap::IsAlive() {
   return stream_->IsAlive();
 }
 
 
-bool TLSWrap::IsClosing() const {
+bool TLSWrap::IsClosing() {
   return stream_->IsClosing();
 }
 
@@ -536,12 +536,6 @@ void TLSWrap::ClearError() {
 }
 
 
-int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
-  // TODO(indutny): Support it
-  return 0;
-}
-
-
 int TLSWrap::DoWrite(WriteWrap* w,
                      uv_buf_t* bufs,
                      size_t count,
@@ -668,7 +662,7 @@ void TLSWrap::DoRead(ssize_t nread,
 
     HandleScope handle_scope(env()->isolate());
     Context::Scope context_scope(env()->context());
-    OnRead(nread, nullptr, UV_UNKNOWN_HANDLE);
+    OnRead(nread, nullptr);
     return;
   }
 
index 4245205..73a9f84 100644 (file)
@@ -32,16 +32,15 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
                          v8::Handle<v8::Context> context);
 
   void* Cast() override;
-  int GetFD() const override;
-  bool IsAlive() const override;
-  bool IsClosing() const override;
+  int GetFD() override;
+  bool IsAlive() override;
+  bool IsClosing() override;
 
   // JavaScript functions
   int ReadStart() override;
   int ReadStop() override;
 
   int DoShutdown(ShutdownWrap* req_wrap) override;
-  int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
   int DoWrite(WriteWrap* w,
               uv_buf_t* bufs,
               size_t count,
@@ -78,7 +77,7 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
 
   TLSWrap(Environment* env,
           Kind kind,
-          StreamBase* steram,
+          StreamBase* stream,
           v8::Handle<v8::Object> stream_obj,
           v8::Handle<v8::Object> sc);
 
@@ -104,7 +103,7 @@ class TLSWrap : public crypto::SSLWrap<TLSWrap>,
   }
 
   AsyncWrap* GetAsyncWrap() override;
-  bool IsIPCPipe() const override;
+  bool IsIPCPipe() override;
 
   // Resource implementation
   static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
diff --git a/test/parallel/test-tls-js-stream.js b/test/parallel/test-tls-js-stream.js
new file mode 100644 (file)
index 0000000..7caa7e3
--- /dev/null
@@ -0,0 +1,69 @@
+var assert = require('assert');
+var stream = require('stream');
+var tls = require('tls');
+var fs = require('fs');
+var net = require('net');
+
+var common = require('../common');
+
+var connected = {
+  client: 0,
+  server: 0
+};
+
+var server = tls.createServer({
+  key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'),
+  cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem')
+}, function(c) {
+  console.log('new client');
+  connected.server++;
+  c.end('ohai');
+}).listen(common.PORT, function() {
+  var raw = net.connect(common.PORT);
+
+  var pending = false;
+  raw.on('readable', function() {
+    if (pending)
+      p._read();
+  });
+
+  var p = new stream.Duplex({
+    read: function read() {
+      pending = false;
+
+      var chunk = raw.read();
+      if (chunk) {
+        console.log('read', chunk);
+        this.push(chunk);
+      } else {
+        pending = true;
+      }
+    },
+    write: function write(data, enc, cb) {
+      console.log('write', data, enc);
+      raw.write(data, enc, cb);
+    }
+  });
+
+  var socket = tls.connect({
+    socket: p,
+    rejectUnauthorized: false
+  }, function() {
+    console.log('client secure');
+
+    connected.client++;
+
+    socket.end('hello');
+    socket.resume();
+  });
+
+  socket.once('close', function() {
+    console.log('client close');
+    server.close();
+  });
+});
+
+process.once('exit', function() {
+  assert.equal(connected.client, 1);
+  assert.equal(connected.server, 1);
+});