Add Stream base class with stream.pipe
authorRyan Dahl <ry@tinyclouds.org>
Mon, 11 Oct 2010 00:21:36 +0000 (17:21 -0700)
committerRyan Dahl <ry@tinyclouds.org>
Mon, 11 Oct 2010 00:27:03 +0000 (17:27 -0700)
lib/http.js
lib/net.js
lib/stream.js [new file with mode: 0644]
src/node.cc
test/disabled/pipe-test.js [new file with mode: 0644]

index 5edb253..20b7b0f 100755 (executable)
@@ -9,7 +9,7 @@ if (debugLevel & 0x4) {
 }
 
 var net = require('net');
-var events = require('events');
+var Stream = require('stream').Stream;
 
 var FreeList = require('freelist').FreeList;
 var HTTPParser = process.binding('http_parser').HTTPParser;
@@ -185,7 +185,7 @@ var continueExpression = /100-continue/i;
 
 /* Abstract base class for ServerRequest and ClientResponse. */
 function IncomingMessage (socket) {
-  events.EventEmitter.call(this);
+  Stream.call(this);
 
   // TODO Remove one of these eventually.
   this.socket = socket;
@@ -205,7 +205,7 @@ function IncomingMessage (socket) {
   this.statusCode = null;
   this.client = this.socket;
 }
-sys.inherits(IncomingMessage, events.EventEmitter);
+sys.inherits(IncomingMessage, Stream);
 exports.IncomingMessage = IncomingMessage;
 
 IncomingMessage.prototype._parseQueryString = function () {
@@ -282,7 +282,7 @@ IncomingMessage.prototype._addHeaderLine = function (field, value) {
 };
 
 function OutgoingMessage (socket) {
-  events.EventEmitter.call(this, socket);
+  Stream.call(this);
 
   // TODO Remove one of these eventually.
   this.socket = socket;
@@ -301,7 +301,7 @@ function OutgoingMessage (socket) {
 
   this.finished = false;
 }
-sys.inherits(OutgoingMessage, events.EventEmitter);
+sys.inherits(OutgoingMessage, Stream);
 exports.OutgoingMessage = OutgoingMessage;
 
 // This abstract either writing directly to the socket or buffering it.
index af6fc31..c1ec5f4 100644 (file)
@@ -1,6 +1,7 @@
 var sys = require("sys");
 var fs = require("fs");
 var events = require("events");
+var stream = require("stream");
 var dns = require('dns');
 
 var kMinPoolSpace = 128;
@@ -519,7 +520,7 @@ function initStream (self) {
 
 function Stream (fd, type) {
   if (!(this instanceof Stream)) return new Stream(fd, type);
-  events.EventEmitter.call(this);
+  stream.Stream.call(this);
 
   this.fd = null;
   this.type = null;
@@ -531,7 +532,7 @@ function Stream (fd, type) {
     setImplmentationMethods(this);
   }
 };
-sys.inherits(Stream, events.EventEmitter);
+sys.inherits(Stream, stream.Stream);
 exports.Stream = Stream;
 
 
diff --git a/lib/stream.js b/lib/stream.js
new file mode 100644 (file)
index 0000000..6ab1f89
--- /dev/null
@@ -0,0 +1,57 @@
+var events = require('events');
+var inherits = require('sys').inherits;
+
+function Stream () {
+  events.EventEmitter.call(this);
+}
+inherits(Stream, events.EventEmitter);
+exports.Stream = Stream;
+
+Stream.prototype.pipe = function (dest, options) {
+  var source = this;
+
+  source.on("data", function (chunk) {
+    if (false === dest.write(chunk)) source.pause();
+  });
+
+  dest.on("drain", function () {
+    if (source.readable) source.resume();
+  });
+
+  /*
+   * If the 'end' option is not supplied, dest.end() will be called when
+   * source gets the 'end' event.
+   */
+
+  options.end = options && options.end === false ? false : true;
+
+  if (options.end) {
+    source.on("end", function () {
+      dest.end();
+    });
+  }
+
+  /*
+   * Questionable:
+   */
+
+  if (!source.pause) {
+    source.pause = function () {
+      source.emit("pause");
+    };
+  }
+
+  if (!source.resume) {
+    source.resume = function () {
+      source.emit("resume");
+    };
+  }
+
+  dest.on("pause", function () {
+    source.pause();
+  });
+
+  dest.on("resume", function () {
+    if (source.readable) source.resume();
+  });
+};
index 5fbdba8..99252f9 100644 (file)
@@ -1519,6 +1519,7 @@ static Handle<Value> Binding(const Arguments& args) {
     exports->Set(String::New("utils"),        String::New(native_utils));
     exports->Set(String::New("path"),         String::New(native_path));
     exports->Set(String::New("string_decoder"), String::New(native_string_decoder));
+    exports->Set(String::New("stream"),       String::New(native_stream));
     binding_cache->Set(module, exports);
   } else {
 
diff --git a/test/disabled/pipe-test.js b/test/disabled/pipe-test.js
new file mode 100644 (file)
index 0000000..63ef731
--- /dev/null
@@ -0,0 +1,20 @@
+/* 
+ * try with
+ * curl  -d @/usr/share/dict/words http://localhost:8000/123 
+ */
+
+http = require('http');
+
+s = http.Server(function (req, res) {
+  console.log(req.headers);
+
+  req.pipe(process.stdout, { end: false });
+
+  req.on('end', function () {
+    res.writeHead(200);
+    res.write("thanks");
+    res.end();
+  });
+});
+
+s.listen(8000);