1 /* Copyright 2015-present Samsung Electronics Co., Ltd. and other contributors
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
17 var Stream = require('stream').Stream;
18 var util = require('util');
19 var assert = require('assert');
22 function ReadableState(options) {
23 options = options || {};
25 // the internal array of buffers.
28 // the sum of length of buffers.
31 this.defaultEncoding = options.defaultEncoding || 'utf8';
33 // true if in flowing mode.
36 // become `true` when the stream meet EOF.
39 // become `true` just before emit 'end' event.
40 this.endEmitted = false;
44 function Readable(options) {
45 if (!(this instanceof Readable)) {
46 return new Readable(options);
49 this._readableState = new ReadableState(options);
54 util.inherits(Readable, Stream);
57 Readable.prototype.read = function(n) {
58 var state = this._readableState;
61 if (!util.isNumber(n) || n > state.length) {
68 res = readBuffer(this, n);
73 if (state.ended && state.length == 0) {
81 Readable.prototype.on = function(ev, cb) {
82 var res = Stream.prototype.on.call(this, ev, cb);
90 Readable.prototype.isPaused = function() {
91 return !this._readableState.flowing;
95 Readable.prototype.pause = function() {
96 var state = this._readableState;
98 state.flowing = false;
105 Readable.prototype.resume = function() {
106 var state = this._readableState;
107 if (!state.flowing) {
108 state.flowing = true;
109 if (state.length > 0) {
110 emitData(this, readBuffer(this));
117 Readable.prototype.error = function(error) {
118 emitError(this, error);
122 Readable.prototype.push = function(chunk, encoding) {
123 var state = this._readableState;
125 if (!util.isString(chunk) &&
126 !util.isBuffer(chunk) &&
127 !util.isNull(chunk)) {
128 emitError(this, TypeError('Invalid chunk'));
129 } else if (util.isNull(chunk)) {
131 } else if (state.ended) {
132 emitError(this, Error('stream.push() after EOF'));
134 if (util.isString(chunk)) {
135 encoding = encoding || state.defaultEncoding;
136 chunk = new Buffer(chunk, encoding);
139 emitData(this, chunk);
141 state.length += chunk.length;
142 state.buffer.push(chunk);
149 function readBuffer(stream, n) {
150 var state = stream._readableState;
153 if (n == 0 || util.isNullOrUndefined(n)) {
157 if (state.buffer.length === 0 || state.length === 0) {
159 } else if (n >= state.length) {
160 res = Buffer.concat(state.buffer);
164 throw new Error('not implemented');
171 function emitEnd(stream) {
172 var state = stream._readableState;
174 if (stream.length > 0 || !state.ended) {
175 throw new Error('stream ended on non-EOF stream');
177 if (!state.endEmitted) {
178 state.endEmitted = true;
184 function emitReadable(stream) {
185 stream.emit('readable');
189 function emitData(stream, data) {
190 var state = stream._readableState;
192 assert.equal(readBuffer(stream), null);
193 stream.emit('data', data);
195 if (state.ended && state.length == 0) {
201 function emitError(stream, er) {
202 stream.emit('error', er);
206 function onEof(stream) {
207 var state = stream._readableState;
211 if (state.length == 0) {
217 module.exports = Readable;