Upload packaging folder
[platform/upstream/iotjs.git] / tools / src / js / stream_readable.js
1 /* Copyright 2015-present Samsung Electronics Co., Ltd. and other contributors
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16
17 var Stream = require('stream').Stream;
18 var util = require('util');
19 var assert = require('assert');
20
21
22 function ReadableState(options) {
23   options = options || {};
24
25   // the internal array of buffers.
26   this.buffer = [];
27
28   // the sum of length of buffers.
29   this.length = 0;
30
31   this.defaultEncoding = options.defaultEncoding || 'utf8';
32
33   // true if in flowing mode.
34   this.flowing = false;
35
36   // become `true` when the stream meet EOF.
37   this.ended = false;
38
39   // become `true` just before emit 'end' event.
40   this.endEmitted = false;
41 };
42
43
44 function Readable(options) {
45   if (!(this instanceof Readable)) {
46     return new Readable(options);
47   }
48
49   this._readableState = new ReadableState(options);
50
51   Stream.call(this);
52 };
53
54 util.inherits(Readable, Stream);
55
56
57 Readable.prototype.read = function(n) {
58   var state = this._readableState;
59   var res;
60
61   if (!util.isNumber(n) || n > state.length) {
62     n = state.length;
63   } else if (n < 0) {
64     n = 0;
65   }
66
67   if (n > 0) {
68     res = readBuffer(this, n);
69   } else {
70     res = null;
71   }
72
73   if (state.ended && state.length == 0) {
74     emitEnd(this);
75   }
76
77   return res;
78 };
79
80
81 Readable.prototype.on = function(ev, cb) {
82   var res = Stream.prototype.on.call(this, ev, cb);
83   if (ev === 'data') {
84     this.resume();
85   }
86   return res;
87 };
88
89
90 Readable.prototype.isPaused = function() {
91   return !this._readableState.flowing;
92 };
93
94
95 Readable.prototype.pause = function() {
96   var state = this._readableState;
97   if (state.flowing) {
98     state.flowing = false;
99     this.emit('pause');
100   }
101   return this;
102 };
103
104
105 Readable.prototype.resume = function() {
106   var state = this._readableState;
107   if (!state.flowing) {
108     state.flowing = true;
109     if (state.length > 0) {
110       emitData(this, readBuffer(this));
111     }
112   }
113   return this;
114 };
115
116
117 Readable.prototype.error = function(error) {
118   emitError(this, error);
119 };
120
121
122 Readable.prototype.push = function(chunk, encoding) {
123   var state = this._readableState;
124
125   if (!util.isString(chunk) &&
126       !util.isBuffer(chunk) &&
127       !util.isNull(chunk)) {
128     emitError(this, TypeError('Invalid chunk'));
129   } else if (util.isNull(chunk)) {
130     onEof(this);
131   } else if (state.ended) {
132     emitError(this, Error('stream.push() after EOF'));
133   } else {
134     if (util.isString(chunk)) {
135       encoding = encoding || state.defaultEncoding;
136       chunk = new Buffer(chunk, encoding);
137     }
138     if (state.flowing) {
139       emitData(this, chunk);
140     } else {
141       state.length += chunk.length;
142       state.buffer.push(chunk);
143       emitReadable(this);
144     }
145   }
146 };
147
148
149 function readBuffer(stream, n) {
150   var state = stream._readableState;
151   var res;
152
153   if (n == 0 || util.isNullOrUndefined(n)) {
154     n = state.length;
155   }
156
157   if (state.buffer.length === 0 || state.length === 0) {
158     res = null;
159   } else if (n >= state.length) {
160     res = Buffer.concat(state.buffer);
161     state.buffer = [];
162     state.length = 0;
163   } else {
164     throw new Error('not implemented');
165   }
166
167   return res;
168 };
169
170
171 function emitEnd(stream) {
172   var state = stream._readableState;
173
174   if (stream.length > 0 || !state.ended) {
175     throw new Error('stream ended on non-EOF stream');
176   }
177   if (!state.endEmitted) {
178     state.endEmitted = true;
179     stream.emit('end');
180   }
181 };
182
183
184 function emitReadable(stream) {
185   stream.emit('readable');
186 };
187
188
189 function emitData(stream, data) {
190   var state = stream._readableState;
191
192   assert.equal(readBuffer(stream), null);
193   stream.emit('data', data);
194
195   if (state.ended && state.length == 0) {
196     emitEnd(stream);
197   }
198 };
199
200
201 function emitError(stream, er) {
202   stream.emit('error', er);
203 };
204
205
206 function onEof(stream) {
207   var state = stream._readableState;
208
209   state.ended = true;
210
211   if (state.length == 0) {
212     emitEnd(stream);
213   }
214 };
215
216
217 module.exports = Readable;