From: Ryan Dahl Date: Mon, 11 Oct 2010 00:21:36 +0000 (-0700) Subject: Add Stream base class with stream.pipe X-Git-Tag: v0.3.0~85 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=bc695475b908ccf30e5016689328df37b678b870;p=platform%2Fupstream%2Fnodejs.git Add Stream base class with stream.pipe --- diff --git a/lib/http.js b/lib/http.js index 5edb253..20b7b0f 100755 --- a/lib/http.js +++ b/lib/http.js @@ -9,7 +9,7 @@ if (debugLevel & 0x4) { } var net = require('net'); -var events = require('events'); +var Stream = require('stream').Stream; var FreeList = require('freelist').FreeList; var HTTPParser = process.binding('http_parser').HTTPParser; @@ -185,7 +185,7 @@ var continueExpression = /100-continue/i; /* Abstract base class for ServerRequest and ClientResponse. */ function IncomingMessage (socket) { - events.EventEmitter.call(this); + Stream.call(this); // TODO Remove one of these eventually. this.socket = socket; @@ -205,7 +205,7 @@ function IncomingMessage (socket) { this.statusCode = null; this.client = this.socket; } -sys.inherits(IncomingMessage, events.EventEmitter); +sys.inherits(IncomingMessage, Stream); exports.IncomingMessage = IncomingMessage; IncomingMessage.prototype._parseQueryString = function () { @@ -282,7 +282,7 @@ IncomingMessage.prototype._addHeaderLine = function (field, value) { }; function OutgoingMessage (socket) { - events.EventEmitter.call(this, socket); + Stream.call(this); // TODO Remove one of these eventually. this.socket = socket; @@ -301,7 +301,7 @@ function OutgoingMessage (socket) { this.finished = false; } -sys.inherits(OutgoingMessage, events.EventEmitter); +sys.inherits(OutgoingMessage, Stream); exports.OutgoingMessage = OutgoingMessage; // This abstract either writing directly to the socket or buffering it. diff --git a/lib/net.js b/lib/net.js index af6fc31..c1ec5f4 100644 --- a/lib/net.js +++ b/lib/net.js @@ -1,6 +1,7 @@ var sys = require("sys"); var fs = require("fs"); var events = require("events"); +var stream = require("stream"); var dns = require('dns'); var kMinPoolSpace = 128; @@ -519,7 +520,7 @@ function initStream (self) { function Stream (fd, type) { if (!(this instanceof Stream)) return new Stream(fd, type); - events.EventEmitter.call(this); + stream.Stream.call(this); this.fd = null; this.type = null; @@ -531,7 +532,7 @@ function Stream (fd, type) { setImplmentationMethods(this); } }; -sys.inherits(Stream, events.EventEmitter); +sys.inherits(Stream, stream.Stream); exports.Stream = Stream; diff --git a/lib/stream.js b/lib/stream.js new file mode 100644 index 0000000..6ab1f89 --- /dev/null +++ b/lib/stream.js @@ -0,0 +1,57 @@ +var events = require('events'); +var inherits = require('sys').inherits; + +function Stream () { + events.EventEmitter.call(this); +} +inherits(Stream, events.EventEmitter); +exports.Stream = Stream; + +Stream.prototype.pipe = function (dest, options) { + var source = this; + + source.on("data", function (chunk) { + if (false === dest.write(chunk)) source.pause(); + }); + + dest.on("drain", function () { + if (source.readable) source.resume(); + }); + + /* + * If the 'end' option is not supplied, dest.end() will be called when + * source gets the 'end' event. + */ + + options.end = options && options.end === false ? false : true; + + if (options.end) { + source.on("end", function () { + dest.end(); + }); + } + + /* + * Questionable: + */ + + if (!source.pause) { + source.pause = function () { + source.emit("pause"); + }; + } + + if (!source.resume) { + source.resume = function () { + source.emit("resume"); + }; + } + + dest.on("pause", function () { + source.pause(); + }); + + dest.on("resume", function () { + if (source.readable) source.resume(); + }); +}; diff --git a/src/node.cc b/src/node.cc index 5fbdba8..99252f9 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1519,6 +1519,7 @@ static Handle Binding(const Arguments& args) { exports->Set(String::New("utils"), String::New(native_utils)); exports->Set(String::New("path"), String::New(native_path)); exports->Set(String::New("string_decoder"), String::New(native_string_decoder)); + exports->Set(String::New("stream"), String::New(native_stream)); binding_cache->Set(module, exports); } else { diff --git a/test/disabled/pipe-test.js b/test/disabled/pipe-test.js new file mode 100644 index 0000000..63ef731 --- /dev/null +++ b/test/disabled/pipe-test.js @@ -0,0 +1,20 @@ +/* + * try with + * curl -d @/usr/share/dict/words http://localhost:8000/123 + */ + +http = require('http'); + +s = http.Server(function (req, res) { + console.log(req.headers); + + req.pipe(process.stdout, { end: false }); + + req.on('end', function () { + res.writeHead(200); + res.write("thanks"); + res.end(); + }); +}); + +s.listen(8000);