4 var util = require('util');
5 var Stream = require('stream');
8 var ChunkStream = module.exports = function() {
17 this._encoding = 'utf8';
20 util.inherits(ChunkStream, Stream);
23 ChunkStream.prototype.read = function(length, callback) {
26 length: Math.abs(length), // if length < 0 then at most this length
27 allowLess: length < 0,
31 process.nextTick(function() {
34 // its paused and there is not enought data then ask for more
35 if (this._paused && this._reads.length > 0) {
43 ChunkStream.prototype.write = function(data, encoding) {
46 this.emit('error', new Error('Stream not writable'));
51 if (Buffer.isBuffer(data)) {
55 dataBuffer = new Buffer(data, encoding || this._encoding);
58 this._buffers.push(dataBuffer);
59 this._buffered += dataBuffer.length;
63 // ok if there are no more read requests
64 if (this._reads && this._reads.length === 0) {
68 return this.writable && !this._paused;
71 ChunkStream.prototype.end = function(data, encoding) {
74 this.write(data, encoding);
77 this.writable = false;
84 // enqueue or handle end
85 if (this._buffers.length === 0) {
89 this._buffers.push(null);
94 ChunkStream.prototype.destroySoon = ChunkStream.prototype.end;
96 ChunkStream.prototype._end = function() {
98 if (this._reads.length > 0) {
100 new Error('Unexpected end of input')
107 ChunkStream.prototype.destroy = function() {
109 if (!this._buffers) {
113 this.writable = false;
115 this._buffers = null;
120 ChunkStream.prototype._processReadAllowingLess = function(read) {
121 // ok there is any data so that we can satisfy this request
122 this._reads.shift(); // == read
124 // first we need to peek into first buffer
125 var smallerBuf = this._buffers[0];
127 // ok there is more data than we need
128 if (smallerBuf.length > read.length) {
130 this._buffered -= read.length;
131 this._buffers[0] = smallerBuf.slice(read.length);
133 read.func.call(this, smallerBuf.slice(0, read.length));
137 // ok this is less than maximum length so use it all
138 this._buffered -= smallerBuf.length;
139 this._buffers.shift(); // == smallerBuf
141 read.func.call(this, smallerBuf);
145 ChunkStream.prototype._processRead = function(read) {
146 this._reads.shift(); // == read
150 var data = new Buffer(read.length);
152 // create buffer for all data
153 while (pos < read.length) {
155 var buf = this._buffers[count++];
156 var len = Math.min(buf.length, read.length - pos);
158 buf.copy(data, pos, 0, len);
161 // last buffer wasn't used all so just slice it and leave
162 if (len !== buf.length) {
163 this._buffers[--count] = buf.slice(len);
167 // remove all used buffers
169 this._buffers.splice(0, count);
172 this._buffered -= read.length;
174 read.func.call(this, data);
177 ChunkStream.prototype._process = function() {
180 // as long as there is any data and read requests
181 while (this._buffered > 0 && this._reads && this._reads.length > 0) {
183 var read = this._reads[0];
185 // read any data (but no more than length)
186 if (read.allowLess) {
187 this._processReadAllowingLess(read);
190 else if (this._buffered >= read.length) {
191 // ok we can meet some expectations
193 this._processRead(read);
196 // not enought data to satisfy first request in queue
197 // so we need to wait for more
202 if (this._buffers && !this.writable) {
207 this.emit('error', ex);