1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/pepper_interface.h"
15 #include "nacl_io/socket/tcp_node.h"
16 #include "nacl_io/stream/stream_fs.h"
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
25 class TcpWork : public StreamFs::Work {
27 explicit TcpWork(const ScopedTcpEventEmitter& emitter)
28 : StreamFs::Work(emitter->stream()->stream()),
32 ~TcpWork() { delete[] data_; }
34 TCPSocketInterface* TCPInterface() {
35 return filesystem()->ppapi()->GetTCPSocketInterface();
39 ScopedTcpEventEmitter emitter_;
43 class TcpSendWork : public TcpWork {
45 explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
46 const ScopedSocketNode& stream)
47 : TcpWork(emitter), node_(stream) {}
49 virtual bool Start(int32_t val) {
50 AUTO_LOCK(emitter_->GetLock());
52 // Does the stream exist, and can it send?
53 if (!node_->TestStreamFlags(SSF_CAN_SEND))
56 // Check if we are already sending.
57 if (node_->TestStreamFlags(SSF_SENDING))
60 size_t tx_data_avail = emitter_->BytesInOutputFIFO();
61 int capped_len = std::min(tx_data_avail, kMaxPacketSize);
65 data_ = new char[capped_len];
66 emitter_->ReadOut_Locked(data_, capped_len);
68 int err = TCPInterface()->Write(node_->socket_resource(),
71 filesystem()->GetRunCompletion(this));
73 if (err != PP_OK_COMPLETIONPENDING) {
74 // Anything else, we should assume the socket has gone bad.
75 node_->SetError_Locked(err);
79 node_->SetStreamFlags(SSF_SENDING);
83 virtual void Run(int32_t length_error) {
84 AUTO_LOCK(emitter_->GetLock());
86 if (length_error < 0) {
87 // Send failed, mark the socket as bad
88 node_->SetError_Locked(length_error);
92 // If we did send, then Q more work.
93 node_->ClearStreamFlags(SSF_SENDING);
98 // We assume that transmits will always complete. If the upstream
99 // actually back pressures, enough to prevent the Send callback
100 // from triggering, this resource may never go away.
101 ScopedSocketNode node_;
104 class TcpRecvWork : public TcpWork {
106 explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
107 : TcpWork(emitter) {}
109 virtual bool Start(int32_t val) {
110 AUTO_LOCK(emitter_->GetLock());
111 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
113 // Does the stream exist, and can it recv?
114 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
117 // If we are not currently receiving
118 if (stream->TestStreamFlags(SSF_RECVING))
121 size_t rx_space_avail = emitter_->SpaceInInputFIFO();
123 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
128 data_ = new char[capped_len];
129 int err = TCPInterface()->Read(stream->socket_resource(),
132 filesystem()->GetRunCompletion(this));
133 if (err != PP_OK_COMPLETIONPENDING) {
134 // Anything else, we should assume the socket has gone bad.
135 stream->SetError_Locked(err);
139 stream->SetStreamFlags(SSF_RECVING);
143 virtual void Run(int32_t length_error) {
144 AUTO_LOCK(emitter_->GetLock());
145 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
150 if (length_error <= 0) {
151 stream->SetError_Locked(length_error);
155 // If we successfully received, queue more input
156 emitter_->WriteIn_Locked(data_, length_error);
157 stream->ClearStreamFlags(SSF_RECVING);
158 stream->QueueInput();
162 class TCPAcceptWork : public StreamFs::Work {
164 explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
165 : StreamFs::Work(stream), emitter_(emitter) {}
167 TCPSocketInterface* TCPInterface() {
168 return filesystem()->ppapi()->GetTCPSocketInterface();
171 virtual bool Start(int32_t val) {
172 AUTO_LOCK(emitter_->GetLock());
173 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
175 // Does the stream exist, and can it accept?
179 // If we are not currently accepting
180 if (!node->TestStreamFlags(SSF_LISTENING))
183 int err = TCPInterface()->Accept(node->socket_resource(),
185 filesystem()->GetRunCompletion(this));
187 if (err != PP_OK_COMPLETIONPENDING) {
188 // Anything else, we should assume the socket has gone bad.
189 node->SetError_Locked(err);
196 virtual void Run(int32_t error) {
197 AUTO_LOCK(emitter_->GetLock());
198 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
203 if (error != PP_OK) {
204 node->SetError_Locked(error);
208 emitter_->SetAcceptedSocket_Locked(new_socket_);
212 PP_Resource new_socket_;
213 ScopedTcpEventEmitter emitter_;
216 class TCPConnectWork : public StreamFs::Work {
218 explicit TCPConnectWork(StreamFs* stream,
219 const ScopedTcpEventEmitter& emitter)
220 : StreamFs::Work(stream), emitter_(emitter) {}
222 TCPSocketInterface* TCPInterface() {
223 return filesystem()->ppapi()->GetTCPSocketInterface();
226 virtual bool Start(int32_t val) {
227 AUTO_LOCK(emitter_->GetLock());
228 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
230 // Does the stream exist, and can it connect?
234 int err = TCPInterface()->Connect(node->socket_resource(),
236 filesystem()->GetRunCompletion(this));
237 if (err != PP_OK_COMPLETIONPENDING) {
238 // Anything else, we should assume the socket has gone bad.
239 node->SetError_Locked(err);
246 virtual void Run(int32_t error) {
247 AUTO_LOCK(emitter_->GetLock());
248 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
253 if (error != PP_OK) {
254 node->ConnectFailed_Locked();
255 node->SetError_Locked(error);
259 node->ConnectDone_Locked();
263 ScopedTcpEventEmitter emitter_;
266 TcpNode::TcpNode(Filesystem* filesystem)
267 : SocketNode(filesystem),
268 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
269 tcp_nodelay_(false) {
270 emitter_->AttachStream(this);
273 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
274 : SocketNode(filesystem, socket),
275 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
276 tcp_nodelay_(false) {
277 emitter_->AttachStream(this);
280 void TcpNode::Destroy() {
281 emitter_->DetachStream();
282 SocketNode::Destroy();
285 Error TcpNode::Init(int open_flags) {
286 Error err = SocketNode::Init(open_flags);
290 if (TCPInterface() == NULL)
293 if (socket_resource_ != 0) {
294 // TCP sockets that are contructed with an existing socket_resource_
295 // are those that generated from calls to Accept() and therefore are
296 // already connected.
297 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
298 ConnectDone_Locked();
301 TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
302 if (0 == socket_resource_)
304 SetStreamFlags(SSF_CAN_CONNECT);
310 EventEmitter* TcpNode::GetEventEmitter() { return emitter_.get(); }
312 void TcpNode::SetError_Locked(int pp_error_num) {
313 SocketNode::SetError_Locked(pp_error_num);
314 emitter_->SetError_Locked();
317 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
318 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
319 AUTO_LOCK(node_lock_);
320 int value = tcp_nodelay_;
321 socklen_t value_len = static_cast<socklen_t>(sizeof(value));
322 int copy_bytes = std::min(value_len, *len);
323 memcpy(optval, &value, copy_bytes);
328 return SocketNode::GetSockOpt(lvl, optname, optval, len);
331 Error TcpNode::SetNoDelay_Locked() {
336 TCPInterface()->SetOption(socket_resource_,
337 PP_TCPSOCKET_OPTION_NO_DELAY,
338 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
339 PP_BlockUntilComplete());
340 return PPErrorToErrno(error);
343 Error TcpNode::SetSockOpt(int lvl,
347 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
348 if (static_cast<size_t>(len) < sizeof(int))
350 AUTO_LOCK(node_lock_);
351 tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
352 return SetNoDelay_Locked();
355 return SocketNode::SetSockOpt(lvl, optname, optval, len);
358 void TcpNode::QueueAccept() {
359 StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
360 stream()->EnqueueWork(work);
363 void TcpNode::QueueConnect() {
364 StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
365 stream()->EnqueueWork(work);
368 void TcpNode::QueueInput() {
369 StreamFs::Work* work = new TcpRecvWork(emitter_);
370 stream()->EnqueueWork(work);
373 void TcpNode::QueueOutput() {
374 if (TestStreamFlags(SSF_SENDING))
377 if (!TestStreamFlags(SSF_CAN_SEND))
380 if (0 == emitter_->BytesInOutputFIFO())
383 StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
384 stream()->EnqueueWork(work);
387 Error TcpNode::Accept(const HandleAttr& attr,
388 PP_Resource* out_sock,
389 struct sockaddr* addr,
391 EventListenerLock wait(GetEventEmitter());
393 if (!TestStreamFlags(SSF_LISTENING))
396 // Either block forever or not at all
397 int ms = attr.IsBlocking() ? -1 : 0;
399 Error err = wait.WaitOnEvent(POLLIN, ms);
400 if (ETIMEDOUT == err)
403 int s = emitter_->GetAcceptedSocket_Locked();
404 // Non-blocking case.
408 // Consume the new socket and start listening for the next one
410 emitter_->ClearEvents_Locked(POLLIN);
412 // Set the out paramaters
413 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
414 *len = ResourceToSockAddr(remote_addr, *len, addr);
415 filesystem_->ppapi()->ReleaseResource(remote_addr);
421 // We can not bind a client socket with PPAPI. For now we ignore the
422 // bind but report the correct address later, just in case someone is
423 // binding without really caring what the address is (for example to
424 // select a more optimized interface/route.)
425 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
426 AUTO_LOCK(node_lock_);
428 /* Only bind once. */
432 local_addr_ = SockAddrToResource(addr, len);
433 int err = TCPInterface()->Bind(
434 socket_resource_, local_addr_, PP_BlockUntilComplete());
436 // If we fail, release the local addr resource
438 filesystem_->ppapi()->ReleaseResource(local_addr_);
440 return PPErrorToErrno(err);
446 Error TcpNode::Connect(const HandleAttr& attr,
447 const struct sockaddr* addr,
449 EventListenerLock wait(GetEventEmitter());
451 if (TestStreamFlags(SSF_CONNECTING))
457 remote_addr_ = SockAddrToResource(addr, len);
458 if (0 == remote_addr_)
461 int ms = attr.IsBlocking() ? -1 : 0;
463 SetStreamFlags(SSF_CONNECTING);
466 Error err = wait.WaitOnEvent(POLLOUT, ms);
467 if (ETIMEDOUT == err)
470 // If we fail, release the dest addr resource
472 ConnectFailed_Locked();
476 ConnectDone_Locked();
480 Error TcpNode::Shutdown(int how) {
481 AUTO_LOCK(node_lock_);
485 AUTO_LOCK(emitter_->GetLock());
486 emitter_->SetError_Locked();
491 void TcpNode::ConnectDone_Locked() {
492 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
494 // Now that we are connected, we can start sending and receiving.
495 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
496 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
498 emitter_->ConnectDone_Locked();
500 // The NODELAY option cannot be set in PPAPI before the socket
501 // is connected, but setsockopt() might have already set it.
504 // Begin the input pump
508 void TcpNode::ConnectFailed_Locked() {
509 filesystem_->ppapi()->ReleaseResource(remote_addr_);
513 Error TcpNode::Listen(int backlog) {
514 AUTO_LOCK(node_lock_);
518 int err = TCPInterface()->Listen(
519 socket_resource_, backlog, PP_BlockUntilComplete());
521 return PPErrorToErrno(err);
523 ClearStreamFlags(SSF_CAN_CONNECT);
524 SetStreamFlags(SSF_LISTENING);
525 emitter_->SetListening_Locked();
530 Error TcpNode::Recv_Locked(void* buf,
532 PP_Resource* out_addr,
534 assert(emitter_.get());
535 *out_len = emitter_->ReadIn_Locked((char*)buf, len);
536 *out_addr = remote_addr_;
538 // Ref the address copy we pass back.
539 filesystem_->ppapi()->AddRefResource(remote_addr_);
543 // TCP ignores dst addr passed to send_to, and always uses bound address
544 Error TcpNode::Send_Locked(const void* buf,
548 assert(emitter_.get());
549 if (emitter_->GetError_Locked())
551 *out_len = emitter_->WriteOut_Locked((char*)buf, len);
555 } // namespace nacl_io
557 #endif // PROVIDES_SOCKET_API