doc: improvements to console.markdown copy
[platform/upstream/nodejs.git] / lib / _stream_writable.js
1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
3 // the drain event emission and buffering.
4
5 'use strict';
6
7 module.exports = Writable;
8 Writable.WritableState = WritableState;
9
10 const util = require('util');
11 const internalUtil = require('internal/util');
12 const Stream = require('stream');
13 const Buffer = require('buffer').Buffer;
14
15 util.inherits(Writable, Stream);
16
17 function nop() {}
18
19 function WriteReq(chunk, encoding, cb) {
20   this.chunk = chunk;
21   this.encoding = encoding;
22   this.callback = cb;
23   this.next = null;
24 }
25
26 function WritableState(options, stream) {
27   options = options || {};
28
29   // object stream flag to indicate whether or not this stream
30   // contains buffers or objects.
31   this.objectMode = !!options.objectMode;
32
33   if (stream instanceof Stream.Duplex)
34     this.objectMode = this.objectMode || !!options.writableObjectMode;
35
36   // the point at which write() starts returning false
37   // Note: 0 is a valid value, means that we always return false if
38   // the entire buffer is not flushed immediately on write()
39   var hwm = options.highWaterMark;
40   var defaultHwm = this.objectMode ? 16 : 16 * 1024;
41   this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
42
43   // cast to ints.
44   this.highWaterMark = ~~this.highWaterMark;
45
46   this.needDrain = false;
47   // at the start of calling end()
48   this.ending = false;
49   // when end() has been called, and returned
50   this.ended = false;
51   // when 'finish' is emitted
52   this.finished = false;
53
54   // should we decode strings into buffers before passing to _write?
55   // this is here so that some node-core streams can optimize string
56   // handling at a lower level.
57   var noDecode = options.decodeStrings === false;
58   this.decodeStrings = !noDecode;
59
60   // Crypto is kind of old and crusty.  Historically, its default string
61   // encoding is 'binary' so we have to make this configurable.
62   // Everything else in the universe uses 'utf8', though.
63   this.defaultEncoding = options.defaultEncoding || 'utf8';
64
65   // not an actual buffer we keep track of, but a measurement
66   // of how much we're waiting to get pushed to some underlying
67   // socket or file.
68   this.length = 0;
69
70   // a flag to see when we're in the middle of a write.
71   this.writing = false;
72
73   // when true all writes will be buffered until .uncork() call
74   this.corked = 0;
75
76   // a flag to be able to tell if the onwrite cb is called immediately,
77   // or on a later tick.  We set this to true at first, because any
78   // actions that shouldn't happen until "later" should generally also
79   // not happen before the first write call.
80   this.sync = true;
81
82   // a flag to know if we're processing previously buffered items, which
83   // may call the _write() callback in the same tick, so that we don't
84   // end up in an overlapped onwrite situation.
85   this.bufferProcessing = false;
86
87   // the callback that's passed to _write(chunk,cb)
88   this.onwrite = function(er) {
89     onwrite(stream, er);
90   };
91
92   // the callback that the user supplies to write(chunk,encoding,cb)
93   this.writecb = null;
94
95   // the amount that is being written when _write is called.
96   this.writelen = 0;
97
98   this.bufferedRequest = null;
99   this.lastBufferedRequest = null;
100
101   // number of pending user-supplied write callbacks
102   // this must be 0 before 'finish' can be emitted
103   this.pendingcb = 0;
104
105   // emit prefinish if the only thing we're waiting for is _write cbs
106   // This is relevant for synchronous Transform streams
107   this.prefinished = false;
108
109   // True if the error was already emitted and should not be thrown again
110   this.errorEmitted = false;
111 }
112
113 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
114   var current = this.bufferedRequest;
115   var out = [];
116   while (current) {
117     out.push(current);
118     current = current.next;
119   }
120   return out;
121 };
122
123 Object.defineProperty(WritableState.prototype, 'buffer', {
124   get: internalUtil.deprecate(function() {
125     return this.getBuffer();
126   }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' +
127      'instead.')
128 });
129
130 function Writable(options) {
131   // Writable ctor is applied to Duplexes, though they're not
132   // instanceof Writable, they're instanceof Readable.
133   if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
134     return new Writable(options);
135
136   this._writableState = new WritableState(options, this);
137
138   // legacy.
139   this.writable = true;
140
141   if (options) {
142     if (typeof options.write === 'function')
143       this._write = options.write;
144
145     if (typeof options.writev === 'function')
146       this._writev = options.writev;
147   }
148
149   Stream.call(this);
150 }
151
152 // Otherwise people can pipe Writable streams, which is just wrong.
153 Writable.prototype.pipe = function() {
154   this.emit('error', new Error('Cannot pipe. Not readable.'));
155 };
156
157
158 function writeAfterEnd(stream, cb) {
159   var er = new Error('write after end');
160   // TODO: defer error events consistently everywhere, not just the cb
161   stream.emit('error', er);
162   process.nextTick(cb, er);
163 }
164
165 // If we get something that is not a buffer, string, null, or undefined,
166 // and we're not in objectMode, then that's an error.
167 // Otherwise stream chunks are all considered to be of length=1, and the
168 // watermarks determine how many objects to keep in the buffer, rather than
169 // how many bytes or characters.
170 function validChunk(stream, state, chunk, cb) {
171   var valid = true;
172
173   if (!(chunk instanceof Buffer) &&
174       typeof chunk !== 'string' &&
175       chunk !== null &&
176       chunk !== undefined &&
177       !state.objectMode) {
178     var er = new TypeError('Invalid non-string/buffer chunk');
179     stream.emit('error', er);
180     process.nextTick(cb, er);
181     valid = false;
182   }
183   return valid;
184 }
185
186 Writable.prototype.write = function(chunk, encoding, cb) {
187   var state = this._writableState;
188   var ret = false;
189
190   if (typeof encoding === 'function') {
191     cb = encoding;
192     encoding = null;
193   }
194
195   if (chunk instanceof Buffer)
196     encoding = 'buffer';
197   else if (!encoding)
198     encoding = state.defaultEncoding;
199
200   if (typeof cb !== 'function')
201     cb = nop;
202
203   if (state.ended)
204     writeAfterEnd(this, cb);
205   else if (validChunk(this, state, chunk, cb)) {
206     state.pendingcb++;
207     ret = writeOrBuffer(this, state, chunk, encoding, cb);
208   }
209
210   return ret;
211 };
212
213 Writable.prototype.cork = function() {
214   var state = this._writableState;
215
216   state.corked++;
217 };
218
219 Writable.prototype.uncork = function() {
220   var state = this._writableState;
221
222   if (state.corked) {
223     state.corked--;
224
225     if (!state.writing &&
226         !state.corked &&
227         !state.finished &&
228         !state.bufferProcessing &&
229         state.bufferedRequest)
230       clearBuffer(this, state);
231   }
232 };
233
234 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
235   // node::ParseEncoding() requires lower case.
236   if (typeof encoding === 'string')
237     encoding = encoding.toLowerCase();
238   if (!Buffer.isEncoding(encoding))
239     throw new TypeError('Unknown encoding: ' + encoding);
240   this._writableState.defaultEncoding = encoding;
241 };
242
243 function decodeChunk(state, chunk, encoding) {
244   if (!state.objectMode &&
245       state.decodeStrings !== false &&
246       typeof chunk === 'string') {
247     chunk = new Buffer(chunk, encoding);
248   }
249   return chunk;
250 }
251
252 // if we're already writing something, then just put this
253 // in the queue, and wait our turn.  Otherwise, call _write
254 // If we return false, then we need a drain event, so set that flag.
255 function writeOrBuffer(stream, state, chunk, encoding, cb) {
256   chunk = decodeChunk(state, chunk, encoding);
257
258   if (chunk instanceof Buffer)
259     encoding = 'buffer';
260   var len = state.objectMode ? 1 : chunk.length;
261
262   state.length += len;
263
264   var ret = state.length < state.highWaterMark;
265   // we must ensure that previous needDrain will not be reset to false.
266   if (!ret)
267     state.needDrain = true;
268
269   if (state.writing || state.corked) {
270     var last = state.lastBufferedRequest;
271     state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
272     if (last) {
273       last.next = state.lastBufferedRequest;
274     } else {
275       state.bufferedRequest = state.lastBufferedRequest;
276     }
277   } else {
278     doWrite(stream, state, false, len, chunk, encoding, cb);
279   }
280
281   return ret;
282 }
283
284 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
285   state.writelen = len;
286   state.writecb = cb;
287   state.writing = true;
288   state.sync = true;
289   if (writev)
290     stream._writev(chunk, state.onwrite);
291   else
292     stream._write(chunk, encoding, state.onwrite);
293   state.sync = false;
294 }
295
296 function onwriteError(stream, state, sync, er, cb) {
297   --state.pendingcb;
298   if (sync)
299     process.nextTick(cb, er);
300   else
301     cb(er);
302
303   stream._writableState.errorEmitted = true;
304   stream.emit('error', er);
305 }
306
307 function onwriteStateUpdate(state) {
308   state.writing = false;
309   state.writecb = null;
310   state.length -= state.writelen;
311   state.writelen = 0;
312 }
313
314 function onwrite(stream, er) {
315   var state = stream._writableState;
316   var sync = state.sync;
317   var cb = state.writecb;
318
319   onwriteStateUpdate(state);
320
321   if (er)
322     onwriteError(stream, state, sync, er, cb);
323   else {
324     // Check if we're actually ready to finish, but don't emit yet
325     var finished = needFinish(state);
326
327     if (!finished &&
328         !state.corked &&
329         !state.bufferProcessing &&
330         state.bufferedRequest) {
331       clearBuffer(stream, state);
332     }
333
334     if (sync) {
335       process.nextTick(afterWrite, stream, state, finished, cb);
336     } else {
337       afterWrite(stream, state, finished, cb);
338     }
339   }
340 }
341
342 function afterWrite(stream, state, finished, cb) {
343   if (!finished)
344     onwriteDrain(stream, state);
345   state.pendingcb--;
346   cb();
347   finishMaybe(stream, state);
348 }
349
350 // Must force callback to be called on nextTick, so that we don't
351 // emit 'drain' before the write() consumer gets the 'false' return
352 // value, and has a chance to attach a 'drain' listener.
353 function onwriteDrain(stream, state) {
354   if (state.length === 0 && state.needDrain) {
355     state.needDrain = false;
356     stream.emit('drain');
357   }
358 }
359
360
361 // if there's something in the buffer waiting, then process it
362 function clearBuffer(stream, state) {
363   state.bufferProcessing = true;
364   var entry = state.bufferedRequest;
365
366   if (stream._writev && entry && entry.next) {
367     // Fast case, write everything using _writev()
368     var buffer = [];
369     var cbs = [];
370     while (entry) {
371       cbs.push(entry.callback);
372       buffer.push(entry);
373       entry = entry.next;
374     }
375
376     // count the one we are adding, as well.
377     // TODO(isaacs) clean this up
378     state.pendingcb++;
379     state.lastBufferedRequest = null;
380     doWrite(stream, state, true, state.length, buffer, '', function(err) {
381       for (var i = 0; i < cbs.length; i++) {
382         state.pendingcb--;
383         cbs[i](err);
384       }
385     });
386
387     // Clear buffer
388   } else {
389     // Slow case, write chunks one-by-one
390     while (entry) {
391       var chunk = entry.chunk;
392       var encoding = entry.encoding;
393       var cb = entry.callback;
394       var len = state.objectMode ? 1 : chunk.length;
395
396       doWrite(stream, state, false, len, chunk, encoding, cb);
397       entry = entry.next;
398       // if we didn't call the onwrite immediately, then
399       // it means that we need to wait until it does.
400       // also, that means that the chunk and cb are currently
401       // being processed, so move the buffer counter past them.
402       if (state.writing) {
403         break;
404       }
405     }
406
407     if (entry === null)
408       state.lastBufferedRequest = null;
409   }
410   state.bufferedRequest = entry;
411   state.bufferProcessing = false;
412 }
413
414 Writable.prototype._write = function(chunk, encoding, cb) {
415   cb(new Error('not implemented'));
416 };
417
418 Writable.prototype._writev = null;
419
420 Writable.prototype.end = function(chunk, encoding, cb) {
421   var state = this._writableState;
422
423   if (typeof chunk === 'function') {
424     cb = chunk;
425     chunk = null;
426     encoding = null;
427   } else if (typeof encoding === 'function') {
428     cb = encoding;
429     encoding = null;
430   }
431
432   if (chunk !== null && chunk !== undefined)
433     this.write(chunk, encoding);
434
435   // .end() fully uncorks
436   if (state.corked) {
437     state.corked = 1;
438     this.uncork();
439   }
440
441   // ignore unnecessary end() calls.
442   if (!state.ending && !state.finished)
443     endWritable(this, state, cb);
444 };
445
446
447 function needFinish(state) {
448   return (state.ending &&
449           state.length === 0 &&
450           state.bufferedRequest === null &&
451           !state.finished &&
452           !state.writing);
453 }
454
455 function prefinish(stream, state) {
456   if (!state.prefinished) {
457     state.prefinished = true;
458     stream.emit('prefinish');
459   }
460 }
461
462 function finishMaybe(stream, state) {
463   var need = needFinish(state);
464   if (need) {
465     if (state.pendingcb === 0) {
466       prefinish(stream, state);
467       state.finished = true;
468       stream.emit('finish');
469     } else {
470       prefinish(stream, state);
471     }
472   }
473   return need;
474 }
475
476 function endWritable(stream, state, cb) {
477   state.ending = true;
478   finishMaybe(stream, state);
479   if (cb) {
480     if (state.finished)
481       process.nextTick(cb);
482     else
483       stream.once('finish', cb);
484   }
485   state.ended = true;
486 }