1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/log.h"
15 #include "nacl_io/pepper_interface.h"
16 #include "nacl_io/socket/tcp_node.h"
17 #include "nacl_io/stream/stream_fs.h"
20 const size_t kMaxPacketSize = 65536;
21 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
26 class TcpWork : public StreamFs::Work {
28 explicit TcpWork(const ScopedTcpEventEmitter& emitter)
29 : StreamFs::Work(emitter->stream()->stream()),
37 TCPSocketInterface* TCPInterface() {
38 return filesystem()->ppapi()->GetTCPSocketInterface();
42 ScopedTcpEventEmitter emitter_;
46 class TcpSendWork : public TcpWork {
48 explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
49 const ScopedSocketNode& stream)
50 : TcpWork(emitter), node_(stream) {}
52 virtual bool Start(int32_t val) {
53 AUTO_LOCK(emitter_->GetLock());
55 // Does the stream exist, and can it send?
56 if (!node_->TestStreamFlags(SSF_CAN_SEND))
59 // Check if we are already sending.
60 if (node_->TestStreamFlags(SSF_SENDING))
63 size_t tx_data_avail = emitter_->BytesInOutputFIFO();
64 int capped_len = std::min(tx_data_avail, kMaxPacketSize);
68 data_ = (char*)malloc(capped_len);
72 emitter_->ReadOut_Locked(data_, capped_len);
74 int err = TCPInterface()->Write(node_->socket_resource(),
77 filesystem()->GetRunCompletion(this));
79 if (err != PP_OK_COMPLETIONPENDING) {
80 // Anything else, we should assume the socket has gone bad.
81 node_->SetError_Locked(err);
85 node_->SetStreamFlags(SSF_SENDING);
89 virtual void Run(int32_t length_error) {
90 AUTO_LOCK(emitter_->GetLock());
92 if (length_error < 0) {
93 // Send failed, mark the socket as bad
94 node_->SetError_Locked(length_error);
98 // If we did send, then Q more work.
99 node_->ClearStreamFlags(SSF_SENDING);
100 node_->QueueOutput();
104 // We assume that transmits will always complete. If the upstream
105 // actually back pressures, enough to prevent the Send callback
106 // from triggering, this resource may never go away.
107 ScopedSocketNode node_;
110 class TcpRecvWork : public TcpWork {
112 explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
113 : TcpWork(emitter) {}
115 virtual bool Start(int32_t val) {
116 AUTO_LOCK(emitter_->GetLock());
117 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
119 // Does the stream exist, and can it recv?
120 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
123 // If we are not currently receiving
124 if (stream->TestStreamFlags(SSF_RECVING))
127 size_t rx_space_avail = emitter_->SpaceInInputFIFO();
129 static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
134 data_ = (char*)malloc(capped_len);
138 int err = TCPInterface()->Read(stream->socket_resource(),
141 filesystem()->GetRunCompletion(this));
142 if (err != PP_OK_COMPLETIONPENDING) {
143 // Anything else, we should assume the socket has gone bad.
144 stream->SetError_Locked(err);
148 stream->SetStreamFlags(SSF_RECVING);
152 virtual void Run(int32_t length_error) {
153 AUTO_LOCK(emitter_->GetLock());
154 TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
159 if (length_error <= 0) {
160 stream->SetError_Locked(length_error);
164 // If we successfully received, queue more input
165 emitter_->WriteIn_Locked(data_, length_error);
166 stream->ClearStreamFlags(SSF_RECVING);
167 stream->QueueInput();
171 class TCPAcceptWork : public StreamFs::Work {
173 explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
174 : StreamFs::Work(stream), emitter_(emitter) {}
176 TCPSocketInterface* TCPInterface() {
177 return filesystem()->ppapi()->GetTCPSocketInterface();
180 virtual bool Start(int32_t val) {
181 AUTO_LOCK(emitter_->GetLock());
182 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
184 // Does the stream exist, and can it accept?
188 // If we are not currently accepting
189 if (!node->TestStreamFlags(SSF_LISTENING))
192 int err = TCPInterface()->Accept(node->socket_resource(),
194 filesystem()->GetRunCompletion(this));
196 if (err != PP_OK_COMPLETIONPENDING) {
197 // Anything else, we should assume the socket has gone bad.
198 node->SetError_Locked(err);
205 virtual void Run(int32_t error) {
206 AUTO_LOCK(emitter_->GetLock());
207 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
212 if (error != PP_OK) {
213 node->SetError_Locked(error);
217 emitter_->SetAcceptedSocket_Locked(new_socket_);
221 PP_Resource new_socket_;
222 ScopedTcpEventEmitter emitter_;
225 class TCPConnectWork : public StreamFs::Work {
227 explicit TCPConnectWork(StreamFs* stream,
228 const ScopedTcpEventEmitter& emitter)
229 : StreamFs::Work(stream), emitter_(emitter) {}
231 TCPSocketInterface* TCPInterface() {
232 return filesystem()->ppapi()->GetTCPSocketInterface();
235 virtual bool Start(int32_t val) {
236 AUTO_LOCK(emitter_->GetLock());
237 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
239 // Does the stream exist, and can it connect?
243 int err = TCPInterface()->Connect(node->socket_resource(),
245 filesystem()->GetRunCompletion(this));
246 if (err != PP_OK_COMPLETIONPENDING) {
247 // Anything else, we should assume the socket has gone bad.
248 node->SetError_Locked(err);
255 virtual void Run(int32_t error) {
256 AUTO_LOCK(emitter_->GetLock());
257 TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
262 if (error != PP_OK) {
263 node->ConnectFailed_Locked();
264 node->SetError_Locked(error);
268 node->ConnectDone_Locked();
272 ScopedTcpEventEmitter emitter_;
275 TcpNode::TcpNode(Filesystem* filesystem)
276 : SocketNode(filesystem),
277 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
278 tcp_nodelay_(false) {
279 emitter_->AttachStream(this);
282 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
283 : SocketNode(filesystem, socket),
284 emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
285 tcp_nodelay_(false) {
286 emitter_->AttachStream(this);
289 void TcpNode::Destroy() {
290 emitter_->DetachStream();
291 SocketNode::Destroy();
294 Error TcpNode::Init(int open_flags) {
295 Error err = SocketNode::Init(open_flags);
299 if (TCPInterface() == NULL) {
300 LOG_ERROR("Got NULL interface: TCP");
304 if (socket_resource_ != 0) {
305 // TCP sockets that are contructed with an existing socket_resource_
306 // are those that generated from calls to Accept() and therefore are
307 // already connected.
308 remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
309 ConnectDone_Locked();
312 TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
313 if (0 == socket_resource_) {
314 LOG_ERROR("Unable to create TCP resource.");
317 SetStreamFlags(SSF_CAN_CONNECT);
323 EventEmitter* TcpNode::GetEventEmitter() {
324 return emitter_.get();
327 void TcpNode::SetError_Locked(int pp_error_num) {
328 SocketNode::SetError_Locked(pp_error_num);
329 emitter_->SetError_Locked();
332 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
333 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
334 AUTO_LOCK(node_lock_);
335 int value = tcp_nodelay_;
336 socklen_t value_len = static_cast<socklen_t>(sizeof(value));
337 int copy_bytes = std::min(value_len, *len);
338 memcpy(optval, &value, copy_bytes);
343 return SocketNode::GetSockOpt(lvl, optname, optval, len);
346 Error TcpNode::SetNoDelay_Locked() {
351 TCPInterface()->SetOption(socket_resource_,
352 PP_TCPSOCKET_OPTION_NO_DELAY,
353 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
354 PP_BlockUntilComplete());
355 return PPErrorToErrno(error);
358 Error TcpNode::SetSockOpt(int lvl,
362 if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
363 if (static_cast<size_t>(len) < sizeof(int))
365 AUTO_LOCK(node_lock_);
366 tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
367 return SetNoDelay_Locked();
370 return SocketNode::SetSockOpt(lvl, optname, optval, len);
373 void TcpNode::QueueAccept() {
374 StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
375 stream()->EnqueueWork(work);
378 void TcpNode::QueueConnect() {
379 StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
380 stream()->EnqueueWork(work);
383 void TcpNode::QueueInput() {
384 StreamFs::Work* work = new TcpRecvWork(emitter_);
385 stream()->EnqueueWork(work);
388 void TcpNode::QueueOutput() {
389 if (TestStreamFlags(SSF_SENDING))
392 if (!TestStreamFlags(SSF_CAN_SEND))
395 if (0 == emitter_->BytesInOutputFIFO())
398 StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
399 stream()->EnqueueWork(work);
402 Error TcpNode::Accept(const HandleAttr& attr,
403 PP_Resource* out_sock,
404 struct sockaddr* addr,
406 EventListenerLock wait(GetEventEmitter());
408 if (!TestStreamFlags(SSF_LISTENING))
411 // Either block forever or not at all
412 int ms = attr.IsBlocking() ? -1 : 0;
414 Error err = wait.WaitOnEvent(POLLIN, ms);
415 if (ETIMEDOUT == err)
418 int s = emitter_->GetAcceptedSocket_Locked();
419 // Non-blocking case.
423 // Consume the new socket and start listening for the next one
425 emitter_->ClearEvents_Locked(POLLIN);
427 // Set the out paramaters
428 PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
429 *len = ResourceToSockAddr(remote_addr, *len, addr);
430 filesystem_->ppapi()->ReleaseResource(remote_addr);
436 // We can not bind a client socket with PPAPI. For now we ignore the
437 // bind but report the correct address later, just in case someone is
438 // binding without really caring what the address is (for example to
439 // select a more optimized interface/route.)
440 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
441 AUTO_LOCK(node_lock_);
443 /* Only bind once. */
447 local_addr_ = SockAddrToResource(addr, len);
448 int err = TCPInterface()->Bind(
449 socket_resource_, local_addr_, PP_BlockUntilComplete());
451 // If we fail, release the local addr resource
453 filesystem_->ppapi()->ReleaseResource(local_addr_);
455 return PPErrorToErrno(err);
461 Error TcpNode::Connect(const HandleAttr& attr,
462 const struct sockaddr* addr,
464 EventListenerLock wait(GetEventEmitter());
466 if (TestStreamFlags(SSF_CONNECTING))
472 remote_addr_ = SockAddrToResource(addr, len);
473 if (0 == remote_addr_)
476 int ms = attr.IsBlocking() ? -1 : 0;
478 SetStreamFlags(SSF_CONNECTING);
481 Error err = wait.WaitOnEvent(POLLOUT, ms);
482 if (ETIMEDOUT == err)
485 // If we fail, release the dest addr resource
487 ConnectFailed_Locked();
491 // Make sure the connection succeeded.
492 if (last_errno_ != 0) {
493 ConnectFailed_Locked();
497 ConnectDone_Locked();
501 Error TcpNode::Shutdown(int how) {
502 AUTO_LOCK(node_lock_);
507 AUTO_LOCK(emitter_->GetLock());
508 emitter_->SetError_Locked();
513 void TcpNode::ConnectDone_Locked() {
514 local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
516 // Now that we are connected, we can start sending and receiving.
517 ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
518 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
520 emitter_->ConnectDone_Locked();
522 // The NODELAY option cannot be set in PPAPI before the socket
523 // is connected, but setsockopt() might have already set it.
526 // Begin the input pump
530 void TcpNode::ConnectFailed_Locked() {
531 filesystem_->ppapi()->ReleaseResource(remote_addr_);
535 Error TcpNode::Listen(int backlog) {
536 AUTO_LOCK(node_lock_);
540 int err = TCPInterface()->Listen(
541 socket_resource_, backlog, PP_BlockUntilComplete());
543 return PPErrorToErrno(err);
545 ClearStreamFlags(SSF_CAN_CONNECT);
546 SetStreamFlags(SSF_LISTENING);
547 emitter_->SetListening_Locked();
552 Error TcpNode::Recv_Locked(void* buf,
554 PP_Resource* out_addr,
556 assert(emitter_.get());
557 *out_len = emitter_->ReadIn_Locked((char*)buf, len);
558 *out_addr = remote_addr_;
560 // Ref the address copy we pass back.
561 filesystem_->ppapi()->AddRefResource(remote_addr_);
565 // TCP ignores dst addr passed to send_to, and always uses bound address
566 Error TcpNode::Send_Locked(const void* buf,
570 assert(emitter_.get());
571 if (emitter_->GetError_Locked())
573 *out_len = emitter_->WriteOut_Locked((char*)buf, len);
577 } // namespace nacl_io
579 #endif // PROVIDES_SOCKET_API