doc: improvements to debugger.markdown copy
[platform/upstream/nodejs.git] / lib / _stream_wrap.js
1 'use strict';
2
3 const assert = require('assert');
4 const util = require('util');
5 const Socket = require('net').Socket;
6 const JSStream = process.binding('js_stream').JSStream;
7 const Buffer = require('buffer').Buffer;
8 const uv = process.binding('uv');
9 const debug = util.debuglog('stream_wrap');
10
11 function StreamWrap(stream) {
12   const handle = new JSStream();
13
14   this.stream = stream;
15
16   this._list = null;
17
18   const self = this;
19   handle.close = function(cb) {
20     debug('close');
21     self.doClose(cb);
22   };
23   handle.isAlive = function() {
24     return self.isAlive();
25   };
26   handle.isClosing = function() {
27     return self.isClosing();
28   };
29   handle.onreadstart = function() {
30     return self.readStart();
31   };
32   handle.onreadstop = function() {
33     return self.readStop();
34   };
35   handle.onshutdown = function(req) {
36     return self.doShutdown(req);
37   };
38   handle.onwrite = function(req, bufs) {
39     return self.doWrite(req, bufs);
40   };
41
42   this.stream.pause();
43   this.stream.on('error', function onerror(err) {
44     self.emit('error', err);
45   });
46   this.stream.on('data', function ondata(chunk) {
47     if (!(chunk instanceof Buffer)) {
48       // Make sure that no further `data` events will happen
49       this.pause();
50       this.removeListener('data', ondata);
51
52       self.emit('error', new Error('Stream has StringDecoder'));
53       return;
54     }
55
56     debug('data', chunk.length);
57     if (self._handle)
58       self._handle.readBuffer(chunk);
59   });
60   this.stream.once('end', function onend() {
61     debug('end');
62     if (self._handle)
63       self._handle.emitEOF();
64   });
65
66   Socket.call(this, {
67     handle: handle
68   });
69 }
70 util.inherits(StreamWrap, Socket);
71 module.exports = StreamWrap;
72
73 // require('_stream_wrap').StreamWrap
74 StreamWrap.StreamWrap = StreamWrap;
75
76 StreamWrap.prototype.isAlive = function isAlive() {
77   return true;
78 };
79
80 StreamWrap.prototype.isClosing = function isClosing() {
81   return !this.readable || !this.writable;
82 };
83
84 StreamWrap.prototype.readStart = function readStart() {
85   this.stream.resume();
86   return 0;
87 };
88
89 StreamWrap.prototype.readStop = function readStop() {
90   this.stream.pause();
91   return 0;
92 };
93
94 StreamWrap.prototype.doShutdown = function doShutdown(req) {
95   const self = this;
96   const handle = this._handle;
97   const item = this._enqueue('shutdown', req);
98
99   this.stream.end(function() {
100     // Ensure that write was dispatched
101     setImmediate(function() {
102       if (!self._dequeue(item))
103         return;
104
105       handle.finishShutdown(req, 0);
106     });
107   });
108   return 0;
109 };
110
111 StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
112   const self = this;
113   const handle = self._handle;
114
115   var pending = bufs.length;
116
117   // Queue the request to be able to cancel it
118   const item = self._enqueue('write', req);
119
120   self.stream.cork();
121   bufs.forEach(function(buf) {
122     self.stream.write(buf, done);
123   });
124   self.stream.uncork();
125
126   function done(err) {
127     if (!err && --pending !== 0)
128       return;
129
130     // Ensure that this is called once in case of error
131     pending = 0;
132
133     // Ensure that write was dispatched
134     setImmediate(function() {
135       // Do not invoke callback twice
136       if (!self._dequeue(item))
137         return;
138
139       var errCode = 0;
140       if (err) {
141         if (err.code && uv['UV_' + err.code])
142           errCode = uv['UV_' + err.code];
143         else
144           errCode = uv.UV_EPIPE;
145       }
146
147       handle.doAfterWrite(req);
148       handle.finishWrite(req, errCode);
149     });
150   }
151
152   return 0;
153 };
154
155 function QueueItem(type, req) {
156   this.type = type;
157   this.req = req;
158   this.prev = this;
159   this.next = this;
160 }
161
162 StreamWrap.prototype._enqueue = function enqueue(type, req) {
163   const item = new QueueItem(type, req);
164   if (this._list === null) {
165     this._list = item;
166     return item;
167   }
168
169   item.next = this._list.next;
170   item.prev = this._list;
171   item.next.prev = item;
172   item.prev.next = item;
173
174   return item;
175 };
176
177 StreamWrap.prototype._dequeue = function dequeue(item) {
178   assert(item instanceof QueueItem);
179
180   var next = item.next;
181   var prev = item.prev;
182
183   if (next === null && prev === null)
184     return false;
185
186   item.next = null;
187   item.prev = null;
188
189   if (next === item) {
190     prev = null;
191     next = null;
192   } else {
193     prev.next = next;
194     next.prev = prev;
195   }
196
197   if (this._list === item)
198     this._list = next;
199
200   return true;
201 };
202
203 StreamWrap.prototype.doClose = function doClose(cb) {
204   const self = this;
205   const handle = self._handle;
206
207   setImmediate(function() {
208     while (self._list !== null) {
209       const item = self._list;
210       const req = item.req;
211       self._dequeue(item);
212
213       const errCode = uv.UV_ECANCELED;
214       if (item.type === 'write') {
215         handle.doAfterWrite(req);
216         handle.finishWrite(req, errCode);
217       } else if (item.type === 'shutdown') {
218         handle.finishShutdown(req, errCode);
219       }
220     }
221
222     // Should be already set by net.js
223     assert(self._handle === null);
224     cb();
225   });
226 };