1 var common = require('../common');
2 var assert = require('assert');
4 var stream = require('stream');
5 var Readable = stream.Readable;
6 var Writable = stream.Writable;
10 var expectTotalData = totalChunks * chunkSize;
11 var expectEndingData = expectTotalData;
13 var r = new Readable({ highWaterMark: 1000 });
14 var chunks = totalChunks;
15 r._read = function(n) {
18 else if (!(chunks % 3))
19 process.nextTick(push);
26 var chunk = chunks-- > 0 ? new Buffer(chunkSize) : null;
28 totalPushed += chunk.length;
36 // first we read 100 bytes
41 function readn(n, then) {
42 console.error('read %d', n);
43 expectEndingData -= n;
47 r.once('readable', read);
49 assert.equal(c.length, n);
50 assert(!r._readableState.flowing);
56 // then we listen to some data events
58 expectEndingData -= 100;
59 console.error('onData');
61 r.on('data', function od(c) {
65 r.removeListener('data', od);
68 // oh no, seen too much!
69 // put the extra back.
70 var diff = seen - 100;
71 r.unshift(c.slice(c.length - diff));
72 console.error('seen too much', seen, diff);
75 // Nothing should be lost in between
76 setImmediate(pipeLittle);
81 // Just pipe 200 bytes, then unshift the extra and unpipe
82 function pipeLittle() {
83 expectEndingData -= 200;
84 console.error('pipe a little');
85 var w = new Writable();
87 w.on('finish', function() {
88 assert.equal(written, 200);
89 setImmediate(read1234);
91 w._write = function(chunk, encoding, cb) {
92 written += chunk.length;
98 var diff = written - 200;
100 r.unshift(chunk.slice(chunk.length - diff));
109 // now read 1234 more bytes
110 function read1234() {
111 readn(1234, resumePause);
114 function resumePause() {
115 console.error('resumePause');
116 // don't read anything, just resume and re-pause a whole bunch
132 console.error('pipe the rest');
133 var w = new Writable();
135 w._write = function(chunk, encoding, cb) {
136 written += chunk.length;
139 w.on('finish', function() {
140 console.error('written', written, totalPushed);
141 assert.equal(written, expectEndingData);
142 assert.equal(totalPushed, expectTotalData);