}
var net = require('net');
-var events = require('events');
+var Stream = require('stream').Stream;
var FreeList = require('freelist').FreeList;
var HTTPParser = process.binding('http_parser').HTTPParser;
/* Abstract base class for ServerRequest and ClientResponse. */
function IncomingMessage (socket) {
- events.EventEmitter.call(this);
+ Stream.call(this);
// TODO Remove one of these eventually.
this.socket = socket;
this.statusCode = null;
this.client = this.socket;
}
-sys.inherits(IncomingMessage, events.EventEmitter);
+sys.inherits(IncomingMessage, Stream);
exports.IncomingMessage = IncomingMessage;
IncomingMessage.prototype._parseQueryString = function () {
};
function OutgoingMessage (socket) {
- events.EventEmitter.call(this, socket);
+ Stream.call(this);
// TODO Remove one of these eventually.
this.socket = socket;
this.finished = false;
}
-sys.inherits(OutgoingMessage, events.EventEmitter);
+sys.inherits(OutgoingMessage, Stream);
exports.OutgoingMessage = OutgoingMessage;
// This abstract either writing directly to the socket or buffering it.
var sys = require("sys");
var fs = require("fs");
var events = require("events");
+var stream = require("stream");
var dns = require('dns');
var kMinPoolSpace = 128;
function Stream (fd, type) {
if (!(this instanceof Stream)) return new Stream(fd, type);
- events.EventEmitter.call(this);
+ stream.Stream.call(this);
this.fd = null;
this.type = null;
setImplmentationMethods(this);
}
};
-sys.inherits(Stream, events.EventEmitter);
+sys.inherits(Stream, stream.Stream);
exports.Stream = Stream;
--- /dev/null
+var events = require('events');
+var inherits = require('sys').inherits;
+
+function Stream () {
+ events.EventEmitter.call(this);
+}
+inherits(Stream, events.EventEmitter);
+exports.Stream = Stream;
+
+Stream.prototype.pipe = function (dest, options) {
+ var source = this;
+
+ source.on("data", function (chunk) {
+ if (false === dest.write(chunk)) source.pause();
+ });
+
+ dest.on("drain", function () {
+ if (source.readable) source.resume();
+ });
+
+ /*
+ * If the 'end' option is not supplied, dest.end() will be called when
+ * source gets the 'end' event.
+ */
+
+ options.end = options && options.end === false ? false : true;
+
+ if (options.end) {
+ source.on("end", function () {
+ dest.end();
+ });
+ }
+
+ /*
+ * Questionable:
+ */
+
+ if (!source.pause) {
+ source.pause = function () {
+ source.emit("pause");
+ };
+ }
+
+ if (!source.resume) {
+ source.resume = function () {
+ source.emit("resume");
+ };
+ }
+
+ dest.on("pause", function () {
+ source.pause();
+ });
+
+ dest.on("resume", function () {
+ if (source.readable) source.resume();
+ });
+};
exports->Set(String::New("utils"), String::New(native_utils));
exports->Set(String::New("path"), String::New(native_path));
exports->Set(String::New("string_decoder"), String::New(native_string_decoder));
+ exports->Set(String::New("stream"), String::New(native_stream));
binding_cache->Set(module, exports);
} else {
--- /dev/null
+/*
+ * try with
+ * curl -d @/usr/share/dict/words http://localhost:8000/123
+ */
+
+http = require('http');
+
+s = http.Server(function (req, res) {
+ console.log(req.headers);
+
+ req.pipe(process.stdout, { end: false });
+
+ req.on('end', function () {
+ res.writeHead(200);
+ res.write("thanks");
+ res.end();
+ });
+});
+
+s.listen(8000);