return scope.Close(slice);
}
+
+// var charsWritten = buffer.utf8Write(string, offset, length);
+static Handle<Value> Utf8Write(const Arguments &args) {
+ HandleScope scope;
+
+ struct buffer *buffer = BufferUnwrap(args.This());
+
+ if (!args[0]->IsString()) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Argument must be a string")));
+ }
+
+ Local<String> s = args[0]->ToString();
+
+ size_t offset = args[1]->Int32Value();
+
+ char *p = buffer_p(buffer, offset);
+ if (buffer_p(buffer, offset) == NULL) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Offset is out of bounds")));
+ }
+
+ size_t toWrite = args[2]->Int32Value();
+
+ if (buffer_remaining(buffer, offset) < toWrite) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Length is out of bounds")));
+ }
+
+ int written = s->WriteUtf8(p, toWrite);
+
+ return scope.Close(Integer::New(written));
+}
+
+
+// var charsWritten = buffer.asciiWrite(string, offset, length);
+static Handle<Value> AsciiWrite(const Arguments &args) {
+ HandleScope scope;
+
+ struct buffer *buffer = BufferUnwrap(args.This());
+
+ if (!args[0]->IsString()) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Argument must be a string")));
+ }
+
+ Local<String> s = args[0]->ToString();
+
+ size_t offset = args[1]->Int32Value();
+
+ char *p = buffer_p(buffer, offset);
+ if (buffer_p(buffer, offset) == NULL) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Offset is out of bounds")));
+ }
+
+ size_t toWrite = args[2]->Int32Value();
+
+ if (buffer_remaining(buffer, offset) < toWrite) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Length is out of bounds")));
+ }
+
+ // TODO Expose the second argument of WriteAscii?
+ // Could avoid doing slices when the string doesn't fit in a buffer. V8
+ // slice() does copy the string, so exposing that argument would help.
+
+ int written = s->WriteAscii(p, 0, toWrite);
+
+ return scope.Close(Integer::New(written));
+}
+
+
+static Handle<Value> Utf8Length(const Arguments &args) {
+ HandleScope scope;
+ if (!args[0]->IsString()) {
+ return ThrowException(Exception::TypeError(String::New(
+ "Argument must be a string")));
+ }
+ Local<String> s = args[0]->ToString();
+ return scope.Close(Integer::New(s->Utf8Length()));
+}
+
+
void InitBuffer(Handle<Object> target) {
HandleScope scope;
// copy
NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Slice", Utf8Slice);
+ NODE_SET_PROTOTYPE_METHOD(constructor_template, "utf8Write", Utf8Write);
+ NODE_SET_PROTOTYPE_METHOD(constructor_template, "asciiWrite", AsciiWrite);
+
+ NODE_SET_METHOD(constructor_template->GetFunction(), "utf8Length", Utf8Length);
+
target->Set(String::NewSymbol("Buffer"), constructor_template->GetFunction());
}
var write = process.write;
var toRead = process.toRead;
-var Peer = function (peerInfo) {
+var Stream = function (peerInfo) {
process.EventEmitter.call();
var self = this;
- process.mixin(self, peerInfo);
+ self.fd = peerInfo.fd;
+ self.remoteAddress = peerInfo.remoteAddress;
+ self.remotePort = peerInfo.remotePort;
// Allocated on demand.
self.recvBuffer = null;
+ self.sendQueue = [];
self.readWatcher = new process.IOWatcher(function () {
debug("\n" + self.fd + " readable");
// If this is the first recv (recvBuffer doesn't exist) or we've used up
// most of the recvBuffer, allocate a new one.
if (!self.recvBuffer ||
- self.recvBuffer.length - self.recvBufferBytesUsed < 128) {
+ self.recvBuffer.length - self.recvBuffer.used < 128) {
self._allocateNewRecvBuf();
}
- debug("recvBufferBytesUsed " + self.recvBufferBytesUsed);
+ debug("recvBuffer.used " + self.recvBuffer.used);
var bytesRead = read(self.fd,
self.recvBuffer,
- self.recvBufferBytesUsed,
- self.recvBuffer.length - self.recvBufferBytesUsed);
+ self.recvBuffer.used,
+ self.recvBuffer.length - self.recvBuffer.used);
debug("bytesRead " + bytesRead + "\n");
- if (bytesRead == 0) {
+ if (bytesRead == 0) {
self.readable = false;
self.readWatcher.stop();
self.emit("eof");
} else {
- var slice = self.recvBuffer.slice(self.recvBufferBytesUsed,
- self.recvBufferBytesUsed + bytesRead);
- self.recvBufferBytesUsed += bytesRead;
+ var slice = self.recvBuffer.slice(self.recvBuffer.used,
+ self.recvBuffer.used + bytesRead);
+ self.recvBuffer.used += bytesRead;
self.emit("receive", slice);
}
});
self.readWatcher.start();
self.writeWatcher = new process.IOWatcher(function () {
- debug(self.fd + " writable");
+ self.flush();
});
self.writeWatcher.set(self.fd, false, true);
self.readable = true;
self.writable = true;
-
- self._out = [];
};
-process.inherits(Peer, process.EventEmitter);
+process.inherits(Stream, process.EventEmitter);
-Peer.prototype._allocateNewRecvBuf = function () {
+Stream.prototype._allocateNewRecvBuf = function () {
var self = this;
var newBufferSize = 1024; // TODO make this adjustable from user API
} else if (bytesToRead == 0) {
// Probably getting an EOF - so let's not allocate so much.
if (self.recvBuffer &&
- self.recvBuffer.length - self.recvBufferBytesUsed > 0) {
+ self.recvBuffer.length - self.recvBuffer.used > 0) {
return; // just recv the eof on the old buf.
}
newBufferSize = 128;
}
self.recvBuffer = new process.Buffer(newBufferSize);
- self.recvBufferBytesUsed = 0;
+ self.recvBuffer.used = 0;
+};
+
+Stream.prototype._allocateSendBuffer = function () {
+ var b = new process.Buffer(1024);
+ b.used = 0;
+ b.sent = 0;
+ this.sendQueue.push(b);
+ return b;
+};
+
+Stream.prototype.send = function (data, encoding) {
+ var self = this;
+ if (typeof(data) == "string") {
+ var buffer;
+ if (self.sendQueue.length == 0) {
+ buffer = self._allocateSendBuffer();
+ } else {
+ // walk through the sendQueue, find the first empty buffer
+ for (var i = 0; i < self.sendQueue.length; i++) {
+ if (self.sendQueue[i].used == 0) {
+ buffer = self.sendQueue[i];
+ break;
+ }
+ }
+ // if we didn't find one, take the last
+ if (!buffer) {
+ buffer = self.sendQueue[self.sendQueue.length-1];
+ // if last buffer is empty
+ if (buffer.length == buffer.used) buffer = self._allocateSendBuffer();
+ }
+ }
+
+ encoding = encoding || "ascii"; // default to ascii since it's faster
+
+ var charsWritten;
+
+ if (encoding.toLowerCase() == "utf8") {
+ charsWritten = buffer.utf8Write(data,
+ buffer.used,
+ buffer.length - buffer.used);
+ buffer.used += process.Buffer.utf8Length(data.slice(0, charsWritten));
+ } else {
+ // ascii
+ charsWritten = buffer.asciiWrite(data,
+ buffer.used,
+ buffer.length - buffer.used);
+ buffer.used += charsWritten;
+ debug("ascii charsWritten " + charsWritten);
+ debug("ascii buffer.used " + buffer.used);
+ }
+
+
+ // If we didn't finish, then recurse with the rest of the string.
+ if (charsWritten < data.length) {
+ debug("recursive send");
+ self.send(data.slice(charsWritten), encoding);
+ }
+ } else {
+ // data is a process.Buffer
+
+ // walk through the sendQueue, find the first empty buffer
+ var inserted = false;
+ data.sent = 0;
+ data.used = data.length;
+ for (var i = 0; i < self.sendQueue.length; i++) {
+ if (self.sendQueue[i].used == 0) {
+ // if found, insert the data there
+ self.sendQueue.splice(i, 0, data);
+ inserted = true;
+ break;
+ }
+ }
+
+ if (!inserted) self.sendQueue.push(data);
+ }
+ this.flush();
+};
+
+// returns true if flushed without getting EAGAIN
+// false if it got EAGAIN
+Stream.prototype.flush = function () {
+ var self = this;
+ var bytesWritten;
+ while (self.sendQueue.length > 0) {
+ var b = self.sendQueue[0];
+
+ if (b.sent == b.used) {
+ // this can be improved - save the buffer for later?
+ self.sendQueue.shift()
+ continue;
+ }
+
+ bytesWritten = write(self.fd,
+ b,
+ b.sent,
+ b.used - b.sent);
+ if (bytesWritten === null) {
+ this.writeWatcher.start();
+ return false;
+ }
+ b.sent += bytesWritten;
+ debug("bytes sent: " + b.sent);
+ }
+ this.writeWatcher.stop();
+ return true;
};
-Peer.prototype.close = function () {
+Stream.prototype.close = function () {
this.readable = false;
this.writable = false;
}
self.watcher = new process.IOWatcher(function (readable, writeable) {
- debug("readable " + readable);
- debug("writable " + writeable);
while (self.fd) {
- debug("accept from " + self.fd);
var peerInfo = accept(self.fd);
debug("accept: " + JSON.stringify(peerInfo));
if (!peerInfo) return;
- var peer = new Peer(peerInfo);
+ var peer = new Stream(peerInfo);
self.emit("connection", peer);
}
});
if (self.fd) throw new Error("Already running");
+ var backlogIndex;
if (typeof(arguments[0]) == "string" && arguments.length == 1) {
// the first argument specifies a path
self.fd = process.socket("UNIX");
// unlink(SOCKFILE);
// }
bind(self.fd, arguments[0]);
+ backlogIndex = 1;
} else {
// the first argument is the port, the second an IP
self.fd = process.socket("TCP");
// TODO dns resolution on arguments[1]
bind(self.fd, arguments[0], arguments[1]);
+ backlogIndex = typeof(arguments[1]) == "string" ? 2 : 1;
}
- listen(self.fd, 128); // TODO configurable backlog
+ listen(self.fd, arguments[backlogIndex] ? arguments[backlogIndex] : 128);
self.watcher.set(self.fd, true, false);
self.watcher.start();
peer.addListener("receive", function (b) {
sys.puts("recv (" + b.length + "): " + b);
+ peer.send("pong\r\n");
});
});
//server.listen(8000);