From: Ryan Dahl Date: Wed, 16 Dec 2009 12:50:28 +0000 (+0100) Subject: Implement stream.send() X-Git-Tag: v0.1.92~84^2~117 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=bddd6e9ca33a069140501b76bdb336fa9cd65d07;p=platform%2Fupstream%2Fnodejs.git Implement stream.send() --- diff --git a/src/node_buffer.cc b/src/node_buffer.cc index f211283..584ed18 100644 --- a/src/node_buffer.cc +++ b/src/node_buffer.cc @@ -254,6 +254,90 @@ static Handle Slice(const Arguments &args) { return scope.Close(slice); } + +// var charsWritten = buffer.utf8Write(string, offset, length); +static Handle Utf8Write(const Arguments &args) { + HandleScope scope; + + struct buffer *buffer = BufferUnwrap(args.This()); + + if (!args[0]->IsString()) { + return ThrowException(Exception::TypeError(String::New( + "Argument must be a string"))); + } + + Local s = args[0]->ToString(); + + size_t offset = args[1]->Int32Value(); + + char *p = buffer_p(buffer, offset); + if (buffer_p(buffer, offset) == NULL) { + return ThrowException(Exception::TypeError(String::New( + "Offset is out of bounds"))); + } + + size_t toWrite = args[2]->Int32Value(); + + if (buffer_remaining(buffer, offset) < toWrite) { + return ThrowException(Exception::TypeError(String::New( + "Length is out of bounds"))); + } + + int written = s->WriteUtf8(p, toWrite); + + return scope.Close(Integer::New(written)); +} + + +// var charsWritten = buffer.asciiWrite(string, offset, length); +static Handle AsciiWrite(const Arguments &args) { + HandleScope scope; + + struct buffer *buffer = BufferUnwrap(args.This()); + + if (!args[0]->IsString()) { + return ThrowException(Exception::TypeError(String::New( + "Argument must be a string"))); + } + + Local s = args[0]->ToString(); + + size_t offset = args[1]->Int32Value(); + + char *p = buffer_p(buffer, offset); + if (buffer_p(buffer, offset) == NULL) { + return ThrowException(Exception::TypeError(String::New( + "Offset is out of bounds"))); + } + + size_t toWrite = args[2]->Int32Value(); + + if (buffer_remaining(buffer, offset) < toWrite) { + return ThrowException(Exception::TypeError(String::New( + "Length is out of bounds"))); + } + + // TODO Expose the second argument of WriteAscii? + // Could avoid doing slices when the string doesn't fit in a buffer. V8 + // slice() does copy the string, so exposing that argument would help. + + int written = s->WriteAscii(p, 0, toWrite); + + return scope.Close(Integer::New(written)); +} + + +static Handle Utf8Length(const Arguments &args) { + HandleScope scope; + if (!args[0]->IsString()) { + return ThrowException(Exception::TypeError(String::New( + "Argument must be a string"))); + } + Local s = args[0]->ToString(); + return scope.Close(Integer::New(s->Utf8Length())); +} + + void InitBuffer(Handle target) { HandleScope scope; @@ -271,6 +355,11 @@ void InitBuffer(Handle target) { // copy NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Slice", Utf8Slice); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Write", Utf8Write); + NODE_SET_PROTOTYPE_METHOD(constructor_template, "asciiWrite", AsciiWrite); + + NODE_SET_METHOD(constructor_template->GetFunction(), "utf8Length", Utf8Length); + target->Set(String::NewSymbol("Buffer"), constructor_template->GetFunction()); } diff --git a/src/node_net2.cc b/src/node_net2.cc index 3d8a1fa..7b4a788 100644 --- a/src/node_net2.cc +++ b/src/node_net2.cc @@ -425,9 +425,9 @@ static Handle Read(const Arguments& args) { String::New("Length is extends beyond buffer"))); } - size_t bytes_read = read(fd, - buffer_p(buffer, off), - buffer_remaining(buffer, off)); + ssize_t bytes_read = read(fd, + buffer_p(buffer, off), + buffer_remaining(buffer, off)); if (bytes_read < 0) { if (errno == EAGAIN || errno == EINTR) return Null(); @@ -457,20 +457,20 @@ static Handle Write(const Arguments& args) { struct buffer * buffer = BufferUnwrap(args[1]); size_t off = args[2]->Int32Value(); - if (buffer_p(buffer, off) == NULL) { + char *p = buffer_p(buffer, off); + if (p == NULL) { return ThrowException(Exception::Error( String::New("Offset is out of bounds"))); } size_t len = args[3]->Int32Value(); - if (buffer_remaining(buffer, off) < len) { + size_t remaining = buffer_remaining(buffer, off); + if (remaining < len) { return ThrowException(Exception::Error( String::New("Length is extends beyond buffer"))); } - size_t written = write(fd, - buffer_p(buffer, off), - buffer_remaining(buffer, off)); + ssize_t written = write(fd, p, len); if (written < 0) { if (errno == EAGAIN || errno == EINTR) return Null(); diff --git a/tcp.js b/tcp.js index 6dd29f8..36416d2 100644 --- a/tcp.js +++ b/tcp.js @@ -16,15 +16,18 @@ var read = process.read; var write = process.write; var toRead = process.toRead; -var Peer = function (peerInfo) { +var Stream = function (peerInfo) { process.EventEmitter.call(); var self = this; - process.mixin(self, peerInfo); + self.fd = peerInfo.fd; + self.remoteAddress = peerInfo.remoteAddress; + self.remotePort = peerInfo.remotePort; // Allocated on demand. self.recvBuffer = null; + self.sendQueue = []; self.readWatcher = new process.IOWatcher(function () { debug("\n" + self.fd + " readable"); @@ -32,25 +35,25 @@ var Peer = function (peerInfo) { // If this is the first recv (recvBuffer doesn't exist) or we've used up // most of the recvBuffer, allocate a new one. if (!self.recvBuffer || - self.recvBuffer.length - self.recvBufferBytesUsed < 128) { + self.recvBuffer.length - self.recvBuffer.used < 128) { self._allocateNewRecvBuf(); } - debug("recvBufferBytesUsed " + self.recvBufferBytesUsed); + debug("recvBuffer.used " + self.recvBuffer.used); var bytesRead = read(self.fd, self.recvBuffer, - self.recvBufferBytesUsed, - self.recvBuffer.length - self.recvBufferBytesUsed); + self.recvBuffer.used, + self.recvBuffer.length - self.recvBuffer.used); debug("bytesRead " + bytesRead + "\n"); - if (bytesRead == 0) { + if (bytesRead == 0) { self.readable = false; self.readWatcher.stop(); self.emit("eof"); } else { - var slice = self.recvBuffer.slice(self.recvBufferBytesUsed, - self.recvBufferBytesUsed + bytesRead); - self.recvBufferBytesUsed += bytesRead; + var slice = self.recvBuffer.slice(self.recvBuffer.used, + self.recvBuffer.used + bytesRead); + self.recvBuffer.used += bytesRead; self.emit("receive", slice); } }); @@ -58,18 +61,16 @@ var Peer = function (peerInfo) { self.readWatcher.start(); self.writeWatcher = new process.IOWatcher(function () { - debug(self.fd + " writable"); + self.flush(); }); self.writeWatcher.set(self.fd, false, true); self.readable = true; self.writable = true; - - self._out = []; }; -process.inherits(Peer, process.EventEmitter); +process.inherits(Stream, process.EventEmitter); -Peer.prototype._allocateNewRecvBuf = function () { +Stream.prototype._allocateNewRecvBuf = function () { var self = this; var newBufferSize = 1024; // TODO make this adjustable from user API @@ -83,7 +84,7 @@ Peer.prototype._allocateNewRecvBuf = function () { } else if (bytesToRead == 0) { // Probably getting an EOF - so let's not allocate so much. if (self.recvBuffer && - self.recvBuffer.length - self.recvBufferBytesUsed > 0) { + self.recvBuffer.length - self.recvBuffer.used > 0) { return; // just recv the eof on the old buf. } newBufferSize = 128; @@ -91,10 +92,115 @@ Peer.prototype._allocateNewRecvBuf = function () { } self.recvBuffer = new process.Buffer(newBufferSize); - self.recvBufferBytesUsed = 0; + self.recvBuffer.used = 0; +}; + +Stream.prototype._allocateSendBuffer = function () { + var b = new process.Buffer(1024); + b.used = 0; + b.sent = 0; + this.sendQueue.push(b); + return b; +}; + +Stream.prototype.send = function (data, encoding) { + var self = this; + if (typeof(data) == "string") { + var buffer; + if (self.sendQueue.length == 0) { + buffer = self._allocateSendBuffer(); + } else { + // walk through the sendQueue, find the first empty buffer + for (var i = 0; i < self.sendQueue.length; i++) { + if (self.sendQueue[i].used == 0) { + buffer = self.sendQueue[i]; + break; + } + } + // if we didn't find one, take the last + if (!buffer) { + buffer = self.sendQueue[self.sendQueue.length-1]; + // if last buffer is empty + if (buffer.length == buffer.used) buffer = self._allocateSendBuffer(); + } + } + + encoding = encoding || "ascii"; // default to ascii since it's faster + + var charsWritten; + + if (encoding.toLowerCase() == "utf8") { + charsWritten = buffer.utf8Write(data, + buffer.used, + buffer.length - buffer.used); + buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten)); + } else { + // ascii + charsWritten = buffer.asciiWrite(data, + buffer.used, + buffer.length - buffer.used); + buffer.used += charsWritten; + debug("ascii charsWritten " + charsWritten); + debug("ascii buffer.used " + buffer.used); + } + + + // If we didn't finish, then recurse with the rest of the string. + if (charsWritten < data.length) { + debug("recursive send"); + self.send(data.slice(charsWritten), encoding); + } + } else { + // data is a process.Buffer + + // walk through the sendQueue, find the first empty buffer + var inserted = false; + data.sent = 0; + data.used = data.length; + for (var i = 0; i < self.sendQueue.length; i++) { + if (self.sendQueue[i].used == 0) { + // if found, insert the data there + self.sendQueue.splice(i, 0, data); + inserted = true; + break; + } + } + + if (!inserted) self.sendQueue.push(data); + } + this.flush(); +}; + +// returns true if flushed without getting EAGAIN +// false if it got EAGAIN +Stream.prototype.flush = function () { + var self = this; + var bytesWritten; + while (self.sendQueue.length > 0) { + var b = self.sendQueue[0]; + + if (b.sent == b.used) { + // this can be improved - save the buffer for later? + self.sendQueue.shift() + continue; + } + + bytesWritten = write(self.fd, + b, + b.sent, + b.used - b.sent); + if (bytesWritten === null) { + this.writeWatcher.start(); + return false; + } + b.sent += bytesWritten; + debug("bytes sent: " + b.sent); + } + this.writeWatcher.stop(); + return true; }; -Peer.prototype.close = function () { +Stream.prototype.close = function () { this.readable = false; this.writable = false; @@ -113,14 +219,11 @@ var Server = function (listener) { } self.watcher = new process.IOWatcher(function (readable, writeable) { - debug("readable " + readable); - debug("writable " + writeable); while (self.fd) { - debug("accept from " + self.fd); var peerInfo = accept(self.fd); debug("accept: " + JSON.stringify(peerInfo)); if (!peerInfo) return; - var peer = new Peer(peerInfo); + var peer = new Stream(peerInfo); self.emit("connection", peer); } }); @@ -132,6 +235,7 @@ Server.prototype.listen = function () { if (self.fd) throw new Error("Already running"); + var backlogIndex; if (typeof(arguments[0]) == "string" && arguments.length == 1) { // the first argument specifies a path self.fd = process.socket("UNIX"); @@ -141,13 +245,15 @@ Server.prototype.listen = function () { // unlink(SOCKFILE); // } bind(self.fd, arguments[0]); + backlogIndex = 1; } else { // the first argument is the port, the second an IP self.fd = process.socket("TCP"); // TODO dns resolution on arguments[1] bind(self.fd, arguments[0], arguments[1]); + backlogIndex = typeof(arguments[1]) == "string" ? 2 : 1; } - listen(self.fd, 128); // TODO configurable backlog + listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128); self.watcher.set(self.fd, true, false); self.watcher.start(); @@ -179,6 +285,7 @@ var server = new Server(function (peer) { peer.addListener("receive", function (b) { sys.puts("recv (" + b.length + "): " + b); + peer.send("pong\r\n"); }); }); //server.listen(8000);