3 const assert = require('assert');
4 const util = require('util');
5 const Socket = require('net').Socket;
6 const JSStream = process.binding('js_stream').JSStream;
7 const uv = process.binding('uv');
8 const debug = util.debuglog('stream_wrap');
10 function StreamWrap(stream) {
11 const handle = new JSStream();
18 handle.close = function(cb) {
22 handle.isAlive = function() {
23 return self.isAlive();
25 handle.isClosing = function() {
26 return self.isClosing();
28 handle.onreadstart = function() {
29 return self.readStart();
31 handle.onreadstop = function() {
32 return self.readStop();
34 handle.onshutdown = function(req) {
35 return self.doShutdown(req);
37 handle.onwrite = function(req, bufs) {
38 return self.doWrite(req, bufs);
42 this.stream.on('error', function(err) {
43 self.emit('error', err);
45 this.stream.on('data', function(chunk) {
46 debug('data', chunk.length);
48 self._handle.readBuffer(chunk);
50 this.stream.once('end', function() {
53 self._handle.emitEOF();
60 util.inherits(StreamWrap, Socket);
61 module.exports = StreamWrap;
63 // require('_stream_wrap').StreamWrap
64 StreamWrap.StreamWrap = StreamWrap;
66 StreamWrap.prototype.isAlive = function isAlive() {
70 StreamWrap.prototype.isClosing = function isClosing() {
71 return !this.readable || !this.writable;
74 StreamWrap.prototype.readStart = function readStart() {
79 StreamWrap.prototype.readStop = function readStop() {
84 StreamWrap.prototype.doShutdown = function doShutdown(req) {
86 const handle = this._handle;
87 const item = this._enqueue('shutdown', req);
89 this.stream.end(function() {
90 // Ensure that write was dispatched
91 setImmediate(function() {
92 if (!self._dequeue(item))
95 handle.finishShutdown(req, 0);
101 StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
103 const handle = self._handle;
105 var pending = bufs.length;
107 // Queue the request to be able to cancel it
108 const item = self._enqueue('write', req);
111 bufs.forEach(function(buf) {
112 self.stream.write(buf, done);
114 self.stream.uncork();
117 if (!err && --pending !== 0)
120 // Ensure that this is called once in case of error
123 // Ensure that write was dispatched
124 setImmediate(function() {
125 // Do not invoke callback twice
126 if (!self._dequeue(item))
131 if (err.code && uv['UV_' + err.code])
132 errCode = uv['UV_' + err.code];
134 errCode = uv.UV_EPIPE;
137 handle.doAfterWrite(req);
138 handle.finishWrite(req, errCode);
145 function QueueItem(type, req) {
152 StreamWrap.prototype._enqueue = function enqueue(type, req) {
153 const item = new QueueItem(type, req);
154 if (this._list === null) {
159 item.next = this._list.next;
160 item.prev = this._list;
161 item.next.prev = item;
162 item.prev.next = item;
167 StreamWrap.prototype._dequeue = function dequeue(item) {
168 assert(item instanceof QueueItem);
170 var next = item.next;
171 var prev = item.prev;
173 if (next === null && prev === null)
187 if (this._list === item)
193 StreamWrap.prototype.doClose = function doClose(cb) {
195 const handle = self._handle;
197 setImmediate(function() {
198 while (self._list !== null) {
199 const item = self._list;
200 const req = item.req;
203 const errCode = uv.UV_ECANCELED;
204 if (item.type === 'write') {
205 handle.doAfterWrite(req);
206 handle.finishWrite(req, errCode);
207 } else if (item.type === 'shutdown') {
208 handle.finishShutdown(req, errCode);
212 // Should be already set by net.js
213 assert(self._handle === null);