1 // A bit simpler than readable streams.
2 // Implement an async ._write(chunk, encoding, cb), and it'll handle all
3 // the drain event emission and buffering.
7 module.exports = Writable;
8 Writable.WritableState = WritableState;
10 const util = require('util');
11 const internalUtil = require('internal/util');
12 const Stream = require('stream');
13 const Buffer = require('buffer').Buffer;
15 util.inherits(Writable, Stream);
19 function WriteReq(chunk, encoding, cb) {
21 this.encoding = encoding;
26 function WritableState(options, stream) {
27 options = options || {};
29 // object stream flag to indicate whether or not this stream
30 // contains buffers or objects.
31 this.objectMode = !!options.objectMode;
33 if (stream instanceof Stream.Duplex)
34 this.objectMode = this.objectMode || !!options.writableObjectMode;
36 // the point at which write() starts returning false
37 // Note: 0 is a valid value, means that we always return false if
38 // the entire buffer is not flushed immediately on write()
39 var hwm = options.highWaterMark;
40 var defaultHwm = this.objectMode ? 16 : 16 * 1024;
41 this.highWaterMark = (hwm || hwm === 0) ? hwm : defaultHwm;
44 this.highWaterMark = ~~this.highWaterMark;
46 this.needDrain = false;
47 // at the start of calling end()
49 // when end() has been called, and returned
51 // when 'finish' is emitted
52 this.finished = false;
54 // should we decode strings into buffers before passing to _write?
55 // this is here so that some node-core streams can optimize string
56 // handling at a lower level.
57 var noDecode = options.decodeStrings === false;
58 this.decodeStrings = !noDecode;
60 // Crypto is kind of old and crusty. Historically, its default string
61 // encoding is 'binary' so we have to make this configurable.
62 // Everything else in the universe uses 'utf8', though.
63 this.defaultEncoding = options.defaultEncoding || 'utf8';
65 // not an actual buffer we keep track of, but a measurement
66 // of how much we're waiting to get pushed to some underlying
70 // a flag to see when we're in the middle of a write.
73 // when true all writes will be buffered until .uncork() call
76 // a flag to be able to tell if the onwrite cb is called immediately,
77 // or on a later tick. We set this to true at first, because any
78 // actions that shouldn't happen until "later" should generally also
79 // not happen before the first write call.
82 // a flag to know if we're processing previously buffered items, which
83 // may call the _write() callback in the same tick, so that we don't
84 // end up in an overlapped onwrite situation.
85 this.bufferProcessing = false;
87 // the callback that's passed to _write(chunk,cb)
88 this.onwrite = function(er) {
92 // the callback that the user supplies to write(chunk,encoding,cb)
95 // the amount that is being written when _write is called.
98 this.bufferedRequest = null;
99 this.lastBufferedRequest = null;
101 // number of pending user-supplied write callbacks
102 // this must be 0 before 'finish' can be emitted
105 // emit prefinish if the only thing we're waiting for is _write cbs
106 // This is relevant for synchronous Transform streams
107 this.prefinished = false;
109 // True if the error was already emitted and should not be thrown again
110 this.errorEmitted = false;
113 WritableState.prototype.getBuffer = function writableStateGetBuffer() {
114 var current = this.bufferedRequest;
118 current = current.next;
123 Object.defineProperty(WritableState.prototype, 'buffer', {
124 get: internalUtil.deprecate(function() {
125 return this.getBuffer();
126 }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' +
130 function Writable(options) {
131 // Writable ctor is applied to Duplexes, though they're not
132 // instanceof Writable, they're instanceof Readable.
133 if (!(this instanceof Writable) && !(this instanceof Stream.Duplex))
134 return new Writable(options);
136 this._writableState = new WritableState(options, this);
139 this.writable = true;
142 if (typeof options.write === 'function')
143 this._write = options.write;
145 if (typeof options.writev === 'function')
146 this._writev = options.writev;
152 // Otherwise people can pipe Writable streams, which is just wrong.
153 Writable.prototype.pipe = function() {
154 this.emit('error', new Error('Cannot pipe. Not readable.'));
158 function writeAfterEnd(stream, cb) {
159 var er = new Error('write after end');
160 // TODO: defer error events consistently everywhere, not just the cb
161 stream.emit('error', er);
162 process.nextTick(cb, er);
165 // If we get something that is not a buffer, string, null, or undefined,
166 // and we're not in objectMode, then that's an error.
167 // Otherwise stream chunks are all considered to be of length=1, and the
168 // watermarks determine how many objects to keep in the buffer, rather than
169 // how many bytes or characters.
170 function validChunk(stream, state, chunk, cb) {
173 if (!(chunk instanceof Buffer) &&
174 typeof chunk !== 'string' &&
176 chunk !== undefined &&
178 var er = new TypeError('Invalid non-string/buffer chunk');
179 stream.emit('error', er);
180 process.nextTick(cb, er);
186 Writable.prototype.write = function(chunk, encoding, cb) {
187 var state = this._writableState;
190 if (typeof encoding === 'function') {
195 if (chunk instanceof Buffer)
198 encoding = state.defaultEncoding;
200 if (typeof cb !== 'function')
204 writeAfterEnd(this, cb);
205 else if (validChunk(this, state, chunk, cb)) {
207 ret = writeOrBuffer(this, state, chunk, encoding, cb);
213 Writable.prototype.cork = function() {
214 var state = this._writableState;
219 Writable.prototype.uncork = function() {
220 var state = this._writableState;
225 if (!state.writing &&
228 !state.bufferProcessing &&
229 state.bufferedRequest)
230 clearBuffer(this, state);
234 Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
235 // node::ParseEncoding() requires lower case.
236 if (typeof encoding === 'string')
237 encoding = encoding.toLowerCase();
238 if (!Buffer.isEncoding(encoding))
239 throw new TypeError('Unknown encoding: ' + encoding);
240 this._writableState.defaultEncoding = encoding;
243 function decodeChunk(state, chunk, encoding) {
244 if (!state.objectMode &&
245 state.decodeStrings !== false &&
246 typeof chunk === 'string') {
247 chunk = new Buffer(chunk, encoding);
252 // if we're already writing something, then just put this
253 // in the queue, and wait our turn. Otherwise, call _write
254 // If we return false, then we need a drain event, so set that flag.
255 function writeOrBuffer(stream, state, chunk, encoding, cb) {
256 chunk = decodeChunk(state, chunk, encoding);
258 if (chunk instanceof Buffer)
260 var len = state.objectMode ? 1 : chunk.length;
264 var ret = state.length < state.highWaterMark;
265 // we must ensure that previous needDrain will not be reset to false.
267 state.needDrain = true;
269 if (state.writing || state.corked) {
270 var last = state.lastBufferedRequest;
271 state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
273 last.next = state.lastBufferedRequest;
275 state.bufferedRequest = state.lastBufferedRequest;
278 doWrite(stream, state, false, len, chunk, encoding, cb);
284 function doWrite(stream, state, writev, len, chunk, encoding, cb) {
285 state.writelen = len;
287 state.writing = true;
290 stream._writev(chunk, state.onwrite);
292 stream._write(chunk, encoding, state.onwrite);
296 function onwriteError(stream, state, sync, er, cb) {
299 process.nextTick(cb, er);
303 stream._writableState.errorEmitted = true;
304 stream.emit('error', er);
307 function onwriteStateUpdate(state) {
308 state.writing = false;
309 state.writecb = null;
310 state.length -= state.writelen;
314 function onwrite(stream, er) {
315 var state = stream._writableState;
316 var sync = state.sync;
317 var cb = state.writecb;
319 onwriteStateUpdate(state);
322 onwriteError(stream, state, sync, er, cb);
324 // Check if we're actually ready to finish, but don't emit yet
325 var finished = needFinish(state);
329 !state.bufferProcessing &&
330 state.bufferedRequest) {
331 clearBuffer(stream, state);
335 process.nextTick(afterWrite, stream, state, finished, cb);
337 afterWrite(stream, state, finished, cb);
342 function afterWrite(stream, state, finished, cb) {
344 onwriteDrain(stream, state);
347 finishMaybe(stream, state);
350 // Must force callback to be called on nextTick, so that we don't
351 // emit 'drain' before the write() consumer gets the 'false' return
352 // value, and has a chance to attach a 'drain' listener.
353 function onwriteDrain(stream, state) {
354 if (state.length === 0 && state.needDrain) {
355 state.needDrain = false;
356 stream.emit('drain');
361 // if there's something in the buffer waiting, then process it
362 function clearBuffer(stream, state) {
363 state.bufferProcessing = true;
364 var entry = state.bufferedRequest;
366 if (stream._writev && entry && entry.next) {
367 // Fast case, write everything using _writev()
371 cbs.push(entry.callback);
376 // count the one we are adding, as well.
377 // TODO(isaacs) clean this up
379 state.lastBufferedRequest = null;
380 doWrite(stream, state, true, state.length, buffer, '', function(err) {
381 for (var i = 0; i < cbs.length; i++) {
389 // Slow case, write chunks one-by-one
391 var chunk = entry.chunk;
392 var encoding = entry.encoding;
393 var cb = entry.callback;
394 var len = state.objectMode ? 1 : chunk.length;
396 doWrite(stream, state, false, len, chunk, encoding, cb);
398 // if we didn't call the onwrite immediately, then
399 // it means that we need to wait until it does.
400 // also, that means that the chunk and cb are currently
401 // being processed, so move the buffer counter past them.
408 state.lastBufferedRequest = null;
410 state.bufferedRequest = entry;
411 state.bufferProcessing = false;
414 Writable.prototype._write = function(chunk, encoding, cb) {
415 cb(new Error('not implemented'));
418 Writable.prototype._writev = null;
420 Writable.prototype.end = function(chunk, encoding, cb) {
421 var state = this._writableState;
423 if (typeof chunk === 'function') {
427 } else if (typeof encoding === 'function') {
432 if (chunk !== null && chunk !== undefined)
433 this.write(chunk, encoding);
435 // .end() fully uncorks
441 // ignore unnecessary end() calls.
442 if (!state.ending && !state.finished)
443 endWritable(this, state, cb);
447 function needFinish(state) {
448 return (state.ending &&
449 state.length === 0 &&
450 state.bufferedRequest === null &&
455 function prefinish(stream, state) {
456 if (!state.prefinished) {
457 state.prefinished = true;
458 stream.emit('prefinish');
462 function finishMaybe(stream, state) {
463 var need = needFinish(state);
465 if (state.pendingcb === 0) {
466 prefinish(stream, state);
467 state.finished = true;
468 stream.emit('finish');
470 prefinish(stream, state);
476 function endWritable(stream, state, cb) {
478 finishMaybe(stream, state);
481 process.nextTick(cb);
483 stream.once('finish', cb);