dns: add missing exports.BADNAME
[platform/upstream/nodejs.git] / lib / _stream_wrap.js
1 'use strict';
2
3 const assert = require('assert');
4 const util = require('util');
5 const Socket = require('net').Socket;
6 const JSStream = process.binding('js_stream').JSStream;
7 const uv = process.binding('uv');
8 const debug = util.debuglog('stream_wrap');
9
10 function StreamWrap(stream) {
11   const handle = new JSStream();
12
13   this.stream = stream;
14
15   this._list = null;
16
17   const self = this;
18   handle.close = function(cb) {
19     debug('close');
20     self.doClose(cb);
21   };
22   handle.isAlive = function() {
23     return self.isAlive();
24   };
25   handle.isClosing = function() {
26     return self.isClosing();
27   };
28   handle.onreadstart = function() {
29     return self.readStart();
30   };
31   handle.onreadstop = function() {
32     return self.readStop();
33   };
34   handle.onshutdown = function(req) {
35     return self.doShutdown(req);
36   };
37   handle.onwrite = function(req, bufs) {
38     return self.doWrite(req, bufs);
39   };
40
41   this.stream.pause();
42   this.stream.on('error', function(err) {
43     self.emit('error', err);
44   });
45   this.stream.on('data', function(chunk) {
46     debug('data', chunk.length);
47     if (self._handle)
48       self._handle.readBuffer(chunk);
49   });
50   this.stream.once('end', function() {
51     debug('end');
52     if (self._handle)
53       self._handle.emitEOF();
54   });
55
56   Socket.call(this, {
57     handle: handle
58   });
59 }
60 util.inherits(StreamWrap, Socket);
61 module.exports = StreamWrap;
62
63 // require('_stream_wrap').StreamWrap
64 StreamWrap.StreamWrap = StreamWrap;
65
66 StreamWrap.prototype.isAlive = function isAlive() {
67   return true;
68 };
69
70 StreamWrap.prototype.isClosing = function isClosing() {
71   return !this.readable || !this.writable;
72 };
73
74 StreamWrap.prototype.readStart = function readStart() {
75   this.stream.resume();
76   return 0;
77 };
78
79 StreamWrap.prototype.readStop = function readStop() {
80   this.stream.pause();
81   return 0;
82 };
83
84 StreamWrap.prototype.doShutdown = function doShutdown(req) {
85   const self = this;
86   const handle = this._handle;
87   const item = this._enqueue('shutdown', req);
88
89   this.stream.end(function() {
90     // Ensure that write was dispatched
91     setImmediate(function() {
92       if (!self._dequeue(item))
93         return;
94
95       handle.finishShutdown(req, 0);
96     });
97   });
98   return 0;
99 };
100
101 StreamWrap.prototype.doWrite = function doWrite(req, bufs) {
102   const self = this;
103   const handle = self._handle;
104
105   var pending = bufs.length;
106
107   // Queue the request to be able to cancel it
108   const item = self._enqueue('write', req);
109
110   self.stream.cork();
111   bufs.forEach(function(buf) {
112     self.stream.write(buf, done);
113   });
114   self.stream.uncork();
115
116   function done(err) {
117     if (!err && --pending !== 0)
118       return;
119
120     // Ensure that this is called once in case of error
121     pending = 0;
122
123     // Ensure that write was dispatched
124     setImmediate(function() {
125       // Do not invoke callback twice
126       if (!self._dequeue(item))
127         return;
128
129       var errCode = 0;
130       if (err) {
131         if (err.code && uv['UV_' + err.code])
132           errCode = uv['UV_' + err.code];
133         else
134           errCode = uv.UV_EPIPE;
135       }
136
137       handle.doAfterWrite(req);
138       handle.finishWrite(req, errCode);
139     });
140   }
141
142   return 0;
143 };
144
145 function QueueItem(type, req) {
146   this.type = type;
147   this.req = req;
148   this.prev = this;
149   this.next = this;
150 }
151
152 StreamWrap.prototype._enqueue = function enqueue(type, req) {
153   const item = new QueueItem(type, req);
154   if (this._list === null) {
155     this._list = item;
156     return item;
157   }
158
159   item.next = this._list.next;
160   item.prev = this._list;
161   item.next.prev = item;
162   item.prev.next = item;
163
164   return item;
165 };
166
167 StreamWrap.prototype._dequeue = function dequeue(item) {
168   assert(item instanceof QueueItem);
169
170   var next = item.next;
171   var prev = item.prev;
172
173   if (next === null && prev === null)
174     return false;
175
176   item.next = null;
177   item.prev = null;
178
179   if (next === item) {
180     prev = null;
181     next = null;
182   } else {
183     prev.next = next;
184     next.prev = prev;
185   }
186
187   if (this._list === item)
188     this._list = next;
189
190   return true;
191 };
192
193 StreamWrap.prototype.doClose = function doClose(cb) {
194   const self = this;
195   const handle = self._handle;
196
197   setImmediate(function() {
198     while (self._list !== null) {
199       const item = self._list;
200       const req = item.req;
201       self._dequeue(item);
202
203       const errCode = uv.UV_ECANCELED;
204       if (item.type === 'write') {
205         handle.doAfterWrite(req);
206         handle.finishWrite(req, errCode);
207       } else if (item.type === 'shutdown') {
208         handle.finishShutdown(req, errCode);
209       }
210     }
211
212     // Should be already set by net.js
213     assert(self._handle === null);
214     cb();
215   });
216 };