--- /dev/null
+const util = require('util');
+const Socket = require('net').Socket;
+const JSStream = process.binding('js_stream').JSStream;
+const uv = process.binding('uv');
+
+function StreamWrap(stream) {
+ var handle = new JSStream();
+
+ this.stream = stream;
+
+ var self = this;
+ handle.close = function(cb) {
+ cb();
+ };
+ handle.isAlive = function() {
+ return self.isAlive();
+ };
+ handle.isClosing = function() {
+ return self.isClosing();
+ };
+ handle.onreadstart = function() {
+ return self.readStart();
+ };
+ handle.onreadstop = function() {
+ return self.readStop();
+ };
+ handle.onshutdown = function(req) {
+ return self.shutdown(req);
+ };
+ handle.onwrite = function(req, bufs) {
+ return self.write(req, bufs);
+ };
+
+ this.stream.pause();
+ this.stream.on('data', function(chunk) {
+ self._handle.readBuffer(chunk);
+ });
+ this.stream.once('end', function() {
+ self._handle.emitEOF();
+ });
+ this.stream.on('error', function(err) {
+ self.emit('error', err);
+ });
+
+ Socket.call(this, {
+ handle: handle
+ });
+}
+util.inherits(StreamWrap, Socket);
+module.exports = StreamWrap;
+
+// require('_stream_wrap').StreamWrap
+StreamWrap.StreamWrap = StreamWrap;
+
+StreamWrap.prototype.isAlive = function isAlive() {
+ return this.readable && this.writable;
+};
+
+StreamWrap.prototype.isClosing = function isClosing() {
+ return !this.isAlive();
+};
+
+StreamWrap.prototype.readStart = function readStart() {
+ this.stream.resume();
+ return 0;
+};
+
+StreamWrap.prototype.readStop = function readStop() {
+ this.stream.pause();
+ return 0;
+};
+
+StreamWrap.prototype.shutdown = function shutdown(req) {
+ var self = this;
+
+ this.stream.end(function() {
+ // Ensure that write was dispatched
+ setImmediate(function() {
+ self._handle.finishShutdown(req, 0);
+ });
+ });
+ return 0;
+};
+
+StreamWrap.prototype.write = function write(req, bufs) {
+ var pending = bufs.length;
+ var self = this;
+
+ self.stream.cork();
+ bufs.forEach(function(buf) {
+ self.stream.write(buf, done);
+ });
+ self.stream.uncork();
+
+ function done(err) {
+ if (!err && --pending !== 0)
+ return;
+
+ // Ensure that this is called once in case of error
+ pending = 0;
+
+ // Ensure that write was dispatched
+ setImmediate(function() {
+ var errCode = 0;
+ if (err) {
+ if (err.code && uv['UV_' + err.code])
+ errCode = uv['UV_' + err.code];
+ else
+ errCode = uv.UV_EPIPE;
+ }
+
+ self._handle.doAfterWrite(req);
+ self._handle.finishWrite(req, errCode);
+ });
+ }
+
+ return 0;
+};
const util = require('util');
const listenerCount = require('events').listenerCount;
const common = require('_tls_common');
+const StreamWrap = require('_stream_wrap').StreamWrap;
+const Duplex = require('stream').Duplex;
const debug = util.debuglog('tls');
const Timer = process.binding('timer_wrap').Timer;
const tls_wrap = process.binding('tls_wrap');
this.authorized = false;
this.authorizationError = null;
+ // Wrap plain JS Stream into StreamWrap
+ if (!(socket instanceof net.Socket) && socket instanceof Duplex)
+ socket = new StreamWrap(socket);
+
// Just a documented property to make secure sockets
// distinguishable from regular ones.
this.encrypted = true;
// Proxy HandleWrap, PipeWrap and TCPWrap methods
proxiedMethods.forEach(function(name) {
res[name] = function methodProxy() {
- return handle[name].apply(handle, arguments);
+ if (handle[name])
+ return handle[name].apply(handle, arguments);
};
});
this.setTimeout(options.handshakeTimeout, this._handleTimeout);
// Socket already has some buffered data - emulate receiving it
- if (socket && socket._readableState.length) {
+ if (socket && socket._readableState && socket._readableState.length) {
var buf;
while ((buf = socket.read()) !== null)
ssl.receive(buf);
self._connecting = false;
self.emit('connect');
});
+
+ socket.on('error', function(err) {
+ self._tlsError(err);
+ });
}
// Assume `tls.connect()`
'lib/_stream_duplex.js',
'lib/_stream_transform.js',
'lib/_stream_passthrough.js',
+ 'lib/_stream_wrap.js',
'lib/string_decoder.js',
'lib/sys.js',
'lib/timers.js',
'src/fs_event_wrap.cc',
'src/cares_wrap.cc',
'src/handle_wrap.cc',
+ 'src/js_stream.cc',
'src/node.cc',
'src/node_buffer.cc',
'src/node_constants.cc',
'src/env.h',
'src/env-inl.h',
'src/handle_wrap.h',
+ 'src/js_stream.h',
'src/node.h',
'src/node_buffer.h',
'src/node_constants.h',
V(FSREQWRAP) \
V(GETADDRINFOREQWRAP) \
V(GETNAMEINFOREQWRAP) \
+ V(JSSTREAM) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
V(QUERYWRAP) \
V(ipv4_string, "IPv4") \
V(ipv6_lc_string, "ipv6") \
V(ipv6_string, "IPv6") \
+ V(isalive_string, "isAlive") \
+ V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
V(kill_signal_string, "killSignal") \
V(onnewsessiondone_string, "onnewsessiondone") \
V(onocspresponse_string, "onocspresponse") \
V(onread_string, "onread") \
+ V(onreadstart_string, "onreadstart") \
+ V(onreadstop_string, "onreadstop") \
V(onselect_string, "onselect") \
+ V(onshutdown_string, "onshutdown") \
V(onsignal_string, "onsignal") \
V(onstop_string, "onstop") \
+ V(onwrite_string, "onwrite") \
V(output_string, "output") \
V(order_string, "order") \
V(owner_string, "owner") \
V(context, v8::Context) \
V(domain_array, v8::Array) \
V(fs_stats_constructor_function, v8::Function) \
+ V(jsstream_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(process_object, v8::Object) \
#include "async-wrap.h"
#include "env.h"
#include "env-inl.h"
+#include "node_buffer.h"
#include "stream_base.h"
#include "v8.h"
namespace node {
+using v8::Array;
using v8::Context;
+using v8::External;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
using v8::Handle;
+using v8::HandleScope;
+using v8::Local;
using v8::Object;
using v8::Value;
+
+JSStream::JSStream(Environment* env, Handle<Object> obj, AsyncWrap* parent)
+ : StreamBase(env),
+ AsyncWrap(env, obj, AsyncWrap::PROVIDER_JSSTREAM, parent) {
+ node::Wrap(obj, this);
+}
+
+
+JSStream::~JSStream() {
+}
+
+
+void* JSStream::Cast() {
+ return static_cast<void*>(this);
+}
+
+
+AsyncWrap* JSStream::GetAsyncWrap() {
+ return static_cast<AsyncWrap*>(this);
+}
+
+
+bool JSStream::IsAlive() {
+ return MakeCallback(env()->isalive_string(), 0, nullptr)->IsTrue();
+}
+
+
+bool JSStream::IsClosing() {
+ return MakeCallback(env()->isclosing_string(), 0, nullptr)->IsTrue();
+}
+
+
+int JSStream::ReadStart() {
+ return MakeCallback(env()->onreadstart_string(), 0, nullptr)->Int32Value();
+}
+
+
+int JSStream::ReadStop() {
+ return MakeCallback(env()->onreadstop_string(), 0, nullptr)->Int32Value();
+}
+
+
+int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
+ HandleScope scope(env()->isolate());
+
+ Local<Value> argv[] = {
+ req_wrap->object()
+ };
+
+ Local<Value> res =
+ MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);
+
+ return res->Int32Value();
+}
+
+
+int JSStream::DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) {
+ CHECK_EQ(send_handle, nullptr);
+
+ HandleScope scope(env()->isolate());
+
+ Local<Array> bufs_arr = Array::New(env()->isolate(), count);
+ for (size_t i = 0; i < count; i++)
+ bufs_arr->Set(i, Buffer::New(env(), bufs[0].base, bufs[0].len));
+
+ Local<Value> argv[] = {
+ w->object(),
+ bufs_arr
+ };
+
+ Local<Value> res =
+ MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);
+
+ return res->Int32Value();
+}
+
+
+void JSStream::New(const FunctionCallbackInfo<Value>& args) {
+ // This constructor should not be exposed to public javascript.
+ // Therefore we assert that we are not trying to call this as a
+ // normal function.
+ CHECK(args.IsConstructCall());
+ Environment* env = Environment::GetCurrent(args);
+ JSStream* wrap;
+
+ if (args.Length() == 0) {
+ wrap = new JSStream(env, args.This(), nullptr);
+ } else if (args[0]->IsExternal()) {
+ void* ptr = args[0].As<External>()->Value();
+ wrap = new JSStream(env, args.This(), static_cast<AsyncWrap*>(ptr));
+ } else {
+ UNREACHABLE();
+ }
+ CHECK(wrap);
+}
+
+
+static void FreeCallback(char* data, void* hint) {
+ // Intentional no-op
+}
+
+
+void JSStream::DoAlloc(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ uv_buf_t buf;
+ wrap->OnAlloc(args[0]->Int32Value(), &buf);
+ args.GetReturnValue().Set(Buffer::New(wrap->env(),
+ buf.base,
+ buf.len,
+ FreeCallback,
+ nullptr));
+}
+
+
+void JSStream::DoRead(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ CHECK(Buffer::HasInstance(args[1]));
+ uv_buf_t buf = uv_buf_init(Buffer::Data(args[1]), Buffer::Length(args[1]));
+ wrap->OnRead(args[0]->Int32Value(), &buf);
+}
+
+
+void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+ WriteWrap* w = Unwrap<WriteWrap>(args[0].As<Object>());
+
+ wrap->OnAfterWrite(w);
+}
+
+
+template <class Wrap>
+void JSStream::Finish(const FunctionCallbackInfo<Value>& args) {
+ Wrap* w = Unwrap<Wrap>(args[0].As<Object>());
+
+ w->Done(args[0]->Int32Value());
+}
+
+
+void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ CHECK(Buffer::HasInstance(args[0]));
+ char* data = Buffer::Data(args[0]);
+ int len = Buffer::Length(args[0]);
+
+ do {
+ uv_buf_t buf;
+ ssize_t avail = len;
+ wrap->OnAlloc(len, &buf);
+ if (static_cast<ssize_t>(buf.len) < avail)
+ avail = buf.len;
+
+ memcpy(buf.base, data, avail);
+ data += avail;
+ len -= avail;
+ wrap->OnRead(avail, &buf);
+ } while (len != 0);
+}
+
+
+void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
+ JSStream* wrap = Unwrap<JSStream>(args.Holder());
+
+ wrap->OnRead(UV_EOF, nullptr);
+}
+
+
void JSStream::Initialize(Handle<Object> target,
Handle<Value> unused,
Handle<Context> context) {
+ Environment* env = Environment::GetCurrent(context);
+
+ Local<FunctionTemplate> t = env->NewFunctionTemplate(New);
+ t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"));
+ t->InstanceTemplate()->SetInternalFieldCount(1);
+
+ env->SetProtoMethod(t, "doAlloc", DoAlloc);
+ env->SetProtoMethod(t, "doRead", DoRead);
+ env->SetProtoMethod(t, "doAfterWrite", DoAfterWrite);
+ env->SetProtoMethod(t, "finishWrite", Finish<WriteWrap>);
+ env->SetProtoMethod(t, "finishShutdown", Finish<ShutdownWrap>);
+ env->SetProtoMethod(t, "readBuffer", ReadBuffer);
+ env->SetProtoMethod(t, "emitEOF", EmitEOF);
+
+ StreamBase::AddMethods<JSStream>(env, t);
+ target->Set(FIXED_ONE_BYTE_STRING(env->isolate(), "JSStream"),
+ t->GetFunction());
+ env->set_jsstream_constructor_template(t);
}
} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_BUILTIN(js_stream, node::JSStream::Initialize)
namespace node {
-class JSStream : public StreamBase {
+class JSStream : public StreamBase, public AsyncWrap {
public:
static void Initialize(v8::Handle<v8::Object> target,
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
+
+ void* Cast() override;
+ bool IsAlive() override;
+ bool IsClosing() override;
+ int ReadStart() override;
+ int ReadStop() override;
+
+ int DoShutdown(ShutdownWrap* req_wrap) override;
+ int DoWrite(WriteWrap* w,
+ uv_buf_t* bufs,
+ size_t count,
+ uv_stream_t* send_handle) override;
+
+ protected:
+ JSStream(Environment* env, v8::Handle<v8::Object> obj, AsyncWrap* parent);
+ ~JSStream();
+
+ AsyncWrap* GetAsyncWrap() override;
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void DoAlloc(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void DoRead(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void DoAfterWrite(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void ReadBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void EmitEOF(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ template <class Wrap>
+ static void Finish(const v8::FunctionCallbackInfo<v8::Value>& args);
};
} // namespace node
#include "env.h"
#include "env-inl.h"
+#include "js_stream.h"
#include "pipe_wrap.h"
#include "tcp_wrap.h"
#include "tty_wrap.h"
env->tls_wrap_constructor_template()->HasInstance(obj)) { \
TLSWrap* const wrap = Unwrap<TLSWrap>(obj); \
BODY \
+ } else if (env->jsstream_constructor_template().IsEmpty() == false && \
+ env->jsstream_constructor_template()->HasInstance(obj)) { \
+ JSStream* const wrap = Unwrap<JSStream>(obj); \
+ BODY \
} \
}); \
} while (0)
#include "node_buffer.h"
#include "env.h"
#include "env-inl.h"
+#include "js_stream.h"
#include "string_bytes.h"
#include "tls_wrap.h"
#include "util.h"
Handle<FunctionTemplate> t);
template void StreamBase::AddMethods<TLSWrap>(Environment* env,
Handle<FunctionTemplate> t);
+template void StreamBase::AddMethods<JSStream>(Environment* env,
+ Handle<FunctionTemplate> t);
template <class Base>
}
-AsyncWrap* StreamBase::GetAsyncWrap() {
+bool StreamBase::IsIPCPipe() {
+ return false;
+}
+
+
+int StreamBase::GetFD() {
+ return -1;
+}
+
+
+int StreamResource::DoTryWrite(uv_buf_t** bufs, size_t* count) {
+ // No TryWrite by default
+ return 0;
+}
+
+
+const char* StreamResource::Error() const {
return nullptr;
}
+
+void StreamResource::ClearError() {
+ // No-op
+}
+
} // namespace node
virtual ~StreamResource() = default;
virtual int DoShutdown(ShutdownWrap* req_wrap) = 0;
- virtual int DoTryWrite(uv_buf_t** bufs, size_t* count) = 0;
+ virtual int DoTryWrite(uv_buf_t** bufs, size_t* count);
virtual int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) = 0;
- virtual const char* Error() const = 0;
- virtual void ClearError() = 0;
+ virtual const char* Error() const;
+ virtual void ClearError();
// Events
inline void OnAfterWrite(WriteWrap* w) {
inline void OnRead(size_t nread,
const uv_buf_t* buf,
- uv_handle_type pending) {
+ uv_handle_type pending = UV_UNKNOWN_HANDLE) {
if (read_cb_ != nullptr)
read_cb_(nread, buf, pending, read_ctx_);
}
v8::Handle<v8::FunctionTemplate> target);
virtual void* Cast() = 0;
- virtual bool IsAlive() const = 0;
- virtual bool IsClosing() const = 0;
- virtual bool IsIPCPipe() const = 0;
- virtual int GetFD() const = 0;
+ virtual bool IsAlive() = 0;
+ virtual bool IsClosing() = 0;
+ virtual bool IsIPCPipe();
+ virtual int GetFD();
virtual int ReadStart() = 0;
virtual int ReadStop() = 0;
}
-int StreamWrap::GetFD() const {
+int StreamWrap::GetFD() {
int fd = -1;
#if !defined(_WIN32)
if (stream() != nullptr)
}
-bool StreamWrap::IsAlive() const {
+bool StreamWrap::IsAlive() {
return HandleWrap::IsAlive(this);
}
-bool StreamWrap::IsClosing() const {
+bool StreamWrap::IsClosing() {
return uv_is_closing(reinterpret_cast<uv_handle_t*>(stream()));
}
}
-bool StreamWrap::IsIPCPipe() const {
+bool StreamWrap::IsIPCPipe() {
return is_named_pipe_ipc();
}
wrap->UpdateWriteQueueSize();
}
-
-const char* StreamWrap::Error() const {
- return nullptr;
-}
-
-
-void StreamWrap::ClearError() {
- // No-op
-}
-
} // namespace node
NODE_MODULE_CONTEXT_AWARE_BUILTIN(stream_wrap, node::StreamWrap::Initialize)
v8::Handle<v8::Value> unused,
v8::Handle<v8::Context> context);
- int GetFD() const override;
+ int GetFD() override;
void* Cast() override;
- bool IsAlive() const override;
- bool IsClosing() const override;
- bool IsIPCPipe() const override;
+ bool IsAlive() override;
+ bool IsClosing() override;
+ bool IsIPCPipe() override;
// JavaScript functions
int ReadStart() override;
uv_buf_t* bufs,
size_t count,
uv_stream_t* send_handle) override;
- const char* Error() const override;
- void ClearError() override;
inline uv_stream_t* stream() const {
return stream_;
size_t copy = buf.len > len ? len : buf.len;
memcpy(buf.base, data, copy);
buf.len = copy;
- wrap->stream_->OnRead(buf.len, &buf, UV_UNKNOWN_HANDLE);
+ wrap->stream_->OnRead(buf.len, &buf);
data += copy;
len -= copy;
if (static_cast<int>(buf.len) < avail)
avail = buf.len;
memcpy(buf.base, out, avail);
- OnRead(avail, &buf, UV_UNKNOWN_HANDLE);
+ OnRead(avail, &buf);
read -= avail;
}
int flags = SSL_get_shutdown(ssl_);
if (!eof_ && flags & SSL_RECEIVED_SHUTDOWN) {
eof_ = true;
- OnRead(UV_EOF, nullptr, UV_UNKNOWN_HANDLE);
+ OnRead(UV_EOF, nullptr);
}
if (read == -1) {
}
-bool TLSWrap::IsIPCPipe() const {
+bool TLSWrap::IsIPCPipe() {
return stream_->IsIPCPipe();
}
-int TLSWrap::GetFD() const {
+int TLSWrap::GetFD() {
return stream_->GetFD();
}
-bool TLSWrap::IsAlive() const {
+bool TLSWrap::IsAlive() {
return stream_->IsAlive();
}
-bool TLSWrap::IsClosing() const {
+bool TLSWrap::IsClosing() {
return stream_->IsClosing();
}
}
-int TLSWrap::DoTryWrite(uv_buf_t** bufs, size_t* count) {
- // TODO(indutny): Support it
- return 0;
-}
-
-
int TLSWrap::DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
- OnRead(nread, nullptr, UV_UNKNOWN_HANDLE);
+ OnRead(nread, nullptr);
return;
}
v8::Handle<v8::Context> context);
void* Cast() override;
- int GetFD() const override;
- bool IsAlive() const override;
- bool IsClosing() const override;
+ int GetFD() override;
+ bool IsAlive() override;
+ bool IsClosing() override;
// JavaScript functions
int ReadStart() override;
int ReadStop() override;
int DoShutdown(ShutdownWrap* req_wrap) override;
- int DoTryWrite(uv_buf_t** bufs, size_t* count) override;
int DoWrite(WriteWrap* w,
uv_buf_t* bufs,
size_t count,
TLSWrap(Environment* env,
Kind kind,
- StreamBase* steram,
+ StreamBase* stream,
v8::Handle<v8::Object> stream_obj,
v8::Handle<v8::Object> sc);
}
AsyncWrap* GetAsyncWrap() override;
- bool IsIPCPipe() const override;
+ bool IsIPCPipe() override;
// Resource implementation
static void OnAfterWriteImpl(WriteWrap* w, void* ctx);
--- /dev/null
+var assert = require('assert');
+var stream = require('stream');
+var tls = require('tls');
+var fs = require('fs');
+var net = require('net');
+
+var common = require('../common');
+
+var connected = {
+ client: 0,
+ server: 0
+};
+
+var server = tls.createServer({
+ key: fs.readFileSync(common.fixturesDir + '/keys/agent1-key.pem'),
+ cert: fs.readFileSync(common.fixturesDir + '/keys/agent1-cert.pem')
+}, function(c) {
+ console.log('new client');
+ connected.server++;
+ c.end('ohai');
+}).listen(common.PORT, function() {
+ var raw = net.connect(common.PORT);
+
+ var pending = false;
+ raw.on('readable', function() {
+ if (pending)
+ p._read();
+ });
+
+ var p = new stream.Duplex({
+ read: function read() {
+ pending = false;
+
+ var chunk = raw.read();
+ if (chunk) {
+ console.log('read', chunk);
+ this.push(chunk);
+ } else {
+ pending = true;
+ }
+ },
+ write: function write(data, enc, cb) {
+ console.log('write', data, enc);
+ raw.write(data, enc, cb);
+ }
+ });
+
+ var socket = tls.connect({
+ socket: p,
+ rejectUnauthorized: false
+ }, function() {
+ console.log('client secure');
+
+ connected.client++;
+
+ socket.end('hello');
+ socket.resume();
+ });
+
+ socket.once('close', function() {
+ console.log('client close');
+ server.close();
+ });
+});
+
+process.once('exit', function() {
+ assert.equal(connected.client, 1);
+ assert.equal(connected.server, 1);
+});