3 const assert = require('assert');
4 const util = require('util');
5 const Socket = require('net').Socket;
6 const JSStream = process.binding('js_stream').JSStream;
7 const Buffer = require('buffer').Buffer;
8 const uv = process.binding('uv');
9 const debug = util.debuglog('stream_wrap');
11 function StreamWrap(stream) {
12 const handle = new JSStream();
19 handle.close = function(cb) {
23 handle.isAlive = function() {
24 return self.isAlive();
26 handle.isClosing = function() {
27 return self.isClosing();
29 handle.onreadstart = function() {
30 return self.readStart();
32 handle.onreadstop = function() {
33 return self.readStop();
35 handle.onshutdown = function(req) {
36 return self.doShutdown(req);
38 handle.onwrite = function(req, bufs) {
39 return self.doWrite(req, bufs);
43 this.stream.on('error', function onerror(err) {
44 self.emit('error', err);
46 this.stream.on('data', function ondata(chunk) {
47 if (!(chunk instanceof Buffer)) {
48 // Make sure that no further `data` events will happen
50 this.removeListener('data', ondata);
52 self.emit('error', new Error('Stream has StringDecoder'));
56 debug('data', chunk.length);
58 self._handle.readBuffer(chunk);
60 this.stream.once('end', function onend() {
63 self._handle.emitEOF();
70 util.inherits(StreamWrap, Socket);
71 module.exports = StreamWrap;
73 // require('_stream_wrap').StreamWrap
74 StreamWrap.StreamWrap = StreamWrap;
76 StreamWrap.prototype.isAlive = function isAlive() {
80 StreamWrap.prototype.isClosing = function isClosing() {
81 return !this.readable || !this.writable;
84 StreamWrap.prototype.readStart = function readStart() {
89 StreamWrap.prototype.readStop = function readStop() {
94 StreamWrap.prototype.doShutdown = function doShutdown(req) {
96 const handle = this._handle;
97 const item = this._enqueue('shutdown', req);
99 this.stream.end(function() {
100 // Ensure that write was dispatched
101 setImmediate(function() {
102 if (!self._dequeue(item))
105 handle.finishShutdown(req, 0);
111 StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
113 const handle = self._handle;
115 var pending = bufs.length;
117 // Queue the request to be able to cancel it
118 const item = self._enqueue('write', req);
121 bufs.forEach(function(buf) {
122 self.stream.write(buf, done);
124 self.stream.uncork();
127 if (!err && --pending !== 0)
130 // Ensure that this is called once in case of error
133 // Ensure that write was dispatched
134 setImmediate(function() {
135 // Do not invoke callback twice
136 if (!self._dequeue(item))
141 if (err.code && uv['UV_' + err.code])
142 errCode = uv['UV_' + err.code];
144 errCode = uv.UV_EPIPE;
147 handle.doAfterWrite(req);
148 handle.finishWrite(req, errCode);
155 function QueueItem(type, req) {
162 StreamWrap.prototype._enqueue = function enqueue(type, req) {
163 const item = new QueueItem(type, req);
164 if (this._list === null) {
169 item.next = this._list.next;
170 item.prev = this._list;
171 item.next.prev = item;
172 item.prev.next = item;
177 StreamWrap.prototype._dequeue = function dequeue(item) {
178 assert(item instanceof QueueItem);
180 var next = item.next;
181 var prev = item.prev;
183 if (next === null && prev === null)
197 if (this._list === item)
203 StreamWrap.prototype.doClose = function doClose(cb) {
205 const handle = self._handle;
207 setImmediate(function() {
208 while (self._list !== null) {
209 const item = self._list;
210 const req = item.req;
213 const errCode = uv.UV_ECANCELED;
214 if (item.type === 'write') {
215 handle.doAfterWrite(req);
216 handle.finishWrite(req, errCode);
217 } else if (item.type === 'shutdown') {
218 handle.finishShutdown(req, errCode);
222 // Should be already set by net.js
223 assert(self._handle === null);