2066c712cd22b33531b0248740403a1d5355b3d3
[platform/framework/web/crosswalk.git] / src / native_client_sdk / src / libraries / nacl_io / socket / tcp_node.cc
1 // Copyright 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "nacl_io/ossocket.h"
6 #ifdef PROVIDES_SOCKET_API
7
8 #include <assert.h>
9 #include <errno.h>
10 #include <string.h>
11 #include <algorithm>
12
13 #include "nacl_io/kernel_handle.h"
14 #include "nacl_io/log.h"
15 #include "nacl_io/pepper_interface.h"
16 #include "nacl_io/socket/tcp_node.h"
17 #include "nacl_io/stream/stream_fs.h"
18
19 namespace {
20 const size_t kMaxPacketSize = 65536;
21 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
22 }
23
24 namespace nacl_io {
25
26 class TcpWork : public StreamFs::Work {
27  public:
28   explicit TcpWork(const ScopedTcpEventEmitter& emitter)
29       : StreamFs::Work(emitter->stream()->stream()),
30         emitter_(emitter),
31         data_(NULL) {}
32
33   ~TcpWork() {
34     free(data_);
35   }
36
37   TCPSocketInterface* TCPInterface() {
38     return filesystem()->ppapi()->GetTCPSocketInterface();
39   }
40
41  protected:
42   ScopedTcpEventEmitter emitter_;
43   char* data_;
44 };
45
46 class TcpSendWork : public TcpWork {
47  public:
48   explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
49                        const ScopedSocketNode& stream)
50       : TcpWork(emitter), node_(stream) {}
51
52   virtual bool Start(int32_t val) {
53     AUTO_LOCK(emitter_->GetLock());
54
55     // Does the stream exist, and can it send?
56     if (!node_->TestStreamFlags(SSF_CAN_SEND))
57       return false;
58
59     // Check if we are already sending.
60     if (node_->TestStreamFlags(SSF_SENDING))
61       return false;
62
63     size_t tx_data_avail = emitter_->BytesInOutputFIFO();
64     int capped_len = std::min(tx_data_avail, kMaxPacketSize);
65     if (capped_len == 0)
66       return false;
67
68     data_ = (char*)malloc(capped_len);
69     assert(data_);
70     if (data_ == NULL)
71       return false;
72     emitter_->ReadOut_Locked(data_, capped_len);
73
74     int err = TCPInterface()->Write(node_->socket_resource(),
75                                     data_,
76                                     capped_len,
77                                     filesystem()->GetRunCompletion(this));
78
79     if (err != PP_OK_COMPLETIONPENDING) {
80       // Anything else, we should assume the socket has gone bad.
81       node_->SetError_Locked(err);
82       return false;
83     }
84
85     node_->SetStreamFlags(SSF_SENDING);
86     return true;
87   }
88
89   virtual void Run(int32_t length_error) {
90     AUTO_LOCK(emitter_->GetLock());
91
92     if (length_error < 0) {
93       // Send failed, mark the socket as bad
94       node_->SetError_Locked(length_error);
95       return;
96     }
97
98     // If we did send, then Q more work.
99     node_->ClearStreamFlags(SSF_SENDING);
100     node_->QueueOutput();
101   }
102
103  private:
104   // We assume that transmits will always complete.  If the upstream
105   // actually back pressures, enough to prevent the Send callback
106   // from triggering, this resource may never go away.
107   ScopedSocketNode node_;
108 };
109
110 class TcpRecvWork : public TcpWork {
111  public:
112   explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
113       : TcpWork(emitter) {}
114
115   virtual bool Start(int32_t val) {
116     AUTO_LOCK(emitter_->GetLock());
117     TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
118
119     // Does the stream exist, and can it recv?
120     if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
121       return false;
122
123     // If we are not currently receiving
124     if (stream->TestStreamFlags(SSF_RECVING))
125       return false;
126
127     size_t rx_space_avail = emitter_->SpaceInInputFIFO();
128     int capped_len =
129         static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));
130
131     if (capped_len == 0)
132       return false;
133
134     data_ = (char*)malloc(capped_len);
135     assert(data_);
136     if (data_ == NULL)
137       return false;
138     int err = TCPInterface()->Read(stream->socket_resource(),
139                                    data_,
140                                    capped_len,
141                                    filesystem()->GetRunCompletion(this));
142     if (err != PP_OK_COMPLETIONPENDING) {
143       // Anything else, we should assume the socket has gone bad.
144       stream->SetError_Locked(err);
145       return false;
146     }
147
148     stream->SetStreamFlags(SSF_RECVING);
149     return true;
150   }
151
152   virtual void Run(int32_t length_error) {
153     AUTO_LOCK(emitter_->GetLock());
154     TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());
155
156     if (!stream)
157       return;
158
159     if (length_error <= 0) {
160       stream->SetError_Locked(length_error);
161       return;
162     }
163
164     // If we successfully received, queue more input
165     emitter_->WriteIn_Locked(data_, length_error);
166     stream->ClearStreamFlags(SSF_RECVING);
167     stream->QueueInput();
168   }
169 };
170
171 class TCPAcceptWork : public StreamFs::Work {
172  public:
173   explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
174       : StreamFs::Work(stream), emitter_(emitter) {}
175
176   TCPSocketInterface* TCPInterface() {
177     return filesystem()->ppapi()->GetTCPSocketInterface();
178   }
179
180   virtual bool Start(int32_t val) {
181     AUTO_LOCK(emitter_->GetLock());
182     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
183
184     // Does the stream exist, and can it accept?
185     if (NULL == node)
186       return false;
187
188     // If we are not currently accepting
189     if (!node->TestStreamFlags(SSF_LISTENING))
190       return false;
191
192     int err = TCPInterface()->Accept(node->socket_resource(),
193                                      &new_socket_,
194                                      filesystem()->GetRunCompletion(this));
195
196     if (err != PP_OK_COMPLETIONPENDING) {
197       // Anything else, we should assume the socket has gone bad.
198       node->SetError_Locked(err);
199       return false;
200     }
201
202     return true;
203   }
204
205   virtual void Run(int32_t error) {
206     AUTO_LOCK(emitter_->GetLock());
207     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
208
209     if (node == NULL)
210       return;
211
212     if (error != PP_OK) {
213       node->SetError_Locked(error);
214       return;
215     }
216
217     emitter_->SetAcceptedSocket_Locked(new_socket_);
218   }
219
220  protected:
221   PP_Resource new_socket_;
222   ScopedTcpEventEmitter emitter_;
223 };
224
225 class TCPConnectWork : public StreamFs::Work {
226  public:
227   explicit TCPConnectWork(StreamFs* stream,
228                           const ScopedTcpEventEmitter& emitter)
229       : StreamFs::Work(stream), emitter_(emitter) {}
230
231   TCPSocketInterface* TCPInterface() {
232     return filesystem()->ppapi()->GetTCPSocketInterface();
233   }
234
235   virtual bool Start(int32_t val) {
236     AUTO_LOCK(emitter_->GetLock());
237     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
238
239     // Does the stream exist, and can it connect?
240     if (NULL == node)
241       return false;
242
243     int err = TCPInterface()->Connect(node->socket_resource(),
244                                       node->remote_addr(),
245                                       filesystem()->GetRunCompletion(this));
246     if (err != PP_OK_COMPLETIONPENDING) {
247       // Anything else, we should assume the socket has gone bad.
248       node->SetError_Locked(err);
249       return false;
250     }
251
252     return true;
253   }
254
255   virtual void Run(int32_t error) {
256     AUTO_LOCK(emitter_->GetLock());
257     TcpNode* node = static_cast<TcpNode*>(emitter_->stream());
258
259     if (node == NULL)
260       return;
261
262     if (error != PP_OK) {
263       node->ConnectFailed_Locked();
264       node->SetError_Locked(error);
265       return;
266     }
267
268     node->ConnectDone_Locked();
269   }
270
271  protected:
272   ScopedTcpEventEmitter emitter_;
273 };
274
275 TcpNode::TcpNode(Filesystem* filesystem)
276     : SocketNode(filesystem),
277       emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
278       tcp_nodelay_(false) {
279   emitter_->AttachStream(this);
280 }
281
282 TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
283     : SocketNode(filesystem, socket),
284       emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
285       tcp_nodelay_(false) {
286   emitter_->AttachStream(this);
287 }
288
289 void TcpNode::Destroy() {
290   emitter_->DetachStream();
291   SocketNode::Destroy();
292 }
293
294 Error TcpNode::Init(int open_flags) {
295   Error err = SocketNode::Init(open_flags);
296   if (err != 0)
297     return err;
298
299   if (TCPInterface() == NULL) {
300     LOG_ERROR("Got NULL interface: TCP");
301     return EACCES;
302   }
303
304   if (socket_resource_ != 0) {
305     // TCP sockets that are contructed with an existing socket_resource_
306     // are those that generated from calls to Accept() and therefore are
307     // already connected.
308     remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
309     ConnectDone_Locked();
310   } else {
311     socket_resource_ =
312         TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
313     if (0 == socket_resource_) {
314       LOG_ERROR("Unable to create TCP resource.");
315       return EACCES;
316     }
317     SetStreamFlags(SSF_CAN_CONNECT);
318   }
319
320   return 0;
321 }
322
323 EventEmitter* TcpNode::GetEventEmitter() {
324   return emitter_.get();
325 }
326
327 void TcpNode::SetError_Locked(int pp_error_num) {
328   SocketNode::SetError_Locked(pp_error_num);
329   emitter_->SetError_Locked();
330 }
331
332 Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
333   if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
334     AUTO_LOCK(node_lock_);
335     int value = tcp_nodelay_;
336     socklen_t value_len = static_cast<socklen_t>(sizeof(value));
337     int copy_bytes = std::min(value_len, *len);
338     memcpy(optval, &value, copy_bytes);
339     *len = value_len;
340     return 0;
341   }
342
343   return SocketNode::GetSockOpt(lvl, optname, optval, len);
344 }
345
346 Error TcpNode::SetNoDelay_Locked() {
347   if (!IsConnected())
348     return 0;
349
350   int32_t error =
351       TCPInterface()->SetOption(socket_resource_,
352                                 PP_TCPSOCKET_OPTION_NO_DELAY,
353                                 PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
354                                 PP_BlockUntilComplete());
355   return PPErrorToErrno(error);
356 }
357
358 Error TcpNode::SetSockOpt(int lvl,
359                           int optname,
360                           const void* optval,
361                           socklen_t len) {
362   if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
363     if (static_cast<size_t>(len) < sizeof(int))
364       return EINVAL;
365     AUTO_LOCK(node_lock_);
366     tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
367     return SetNoDelay_Locked();
368   }
369
370   return SocketNode::SetSockOpt(lvl, optname, optval, len);
371 }
372
373 void TcpNode::QueueAccept() {
374   StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
375   stream()->EnqueueWork(work);
376 }
377
378 void TcpNode::QueueConnect() {
379   StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
380   stream()->EnqueueWork(work);
381 }
382
383 void TcpNode::QueueInput() {
384   StreamFs::Work* work = new TcpRecvWork(emitter_);
385   stream()->EnqueueWork(work);
386 }
387
388 void TcpNode::QueueOutput() {
389   if (TestStreamFlags(SSF_SENDING))
390     return;
391
392   if (!TestStreamFlags(SSF_CAN_SEND))
393     return;
394
395   if (0 == emitter_->BytesInOutputFIFO())
396     return;
397
398   StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
399   stream()->EnqueueWork(work);
400 }
401
402 Error TcpNode::Accept(const HandleAttr& attr,
403                       PP_Resource* out_sock,
404                       struct sockaddr* addr,
405                       socklen_t* len) {
406   EventListenerLock wait(GetEventEmitter());
407
408   if (!TestStreamFlags(SSF_LISTENING))
409     return EINVAL;
410
411   // Either block forever or not at all
412   int ms = attr.IsBlocking() ? -1 : 0;
413
414   Error err = wait.WaitOnEvent(POLLIN, ms);
415   if (ETIMEDOUT == err)
416     return EWOULDBLOCK;
417
418   int s = emitter_->GetAcceptedSocket_Locked();
419   // Non-blocking case.
420   if (s == 0)
421     return EAGAIN;
422
423   // Consume the new socket and start listening for the next one
424   *out_sock = s;
425   emitter_->ClearEvents_Locked(POLLIN);
426
427   // Set the out paramaters
428   PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
429   *len = ResourceToSockAddr(remote_addr, *len, addr);
430   filesystem_->ppapi()->ReleaseResource(remote_addr);
431
432   QueueAccept();
433   return 0;
434 }
435
436 // We can not bind a client socket with PPAPI.  For now we ignore the
437 // bind but report the correct address later, just in case someone is
438 // binding without really caring what the address is (for example to
439 // select a more optimized interface/route.)
440 Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
441   AUTO_LOCK(node_lock_);
442
443   /* Only bind once. */
444   if (IsBound())
445     return EINVAL;
446
447   local_addr_ = SockAddrToResource(addr, len);
448   int err = TCPInterface()->Bind(
449       socket_resource_, local_addr_, PP_BlockUntilComplete());
450
451   // If we fail, release the local addr resource
452   if (err != PP_OK) {
453     filesystem_->ppapi()->ReleaseResource(local_addr_);
454     local_addr_ = 0;
455     return PPErrorToErrno(err);
456   }
457
458   return 0;
459 }
460
461 Error TcpNode::Connect(const HandleAttr& attr,
462                        const struct sockaddr* addr,
463                        socklen_t len) {
464   EventListenerLock wait(GetEventEmitter());
465
466   if (TestStreamFlags(SSF_CONNECTING))
467     return EALREADY;
468
469   if (IsConnected())
470     return EISCONN;
471
472   remote_addr_ = SockAddrToResource(addr, len);
473   if (0 == remote_addr_)
474     return EINVAL;
475
476   int ms = attr.IsBlocking() ? -1 : 0;
477
478   SetStreamFlags(SSF_CONNECTING);
479   QueueConnect();
480
481   Error err = wait.WaitOnEvent(POLLOUT, ms);
482   if (ETIMEDOUT == err)
483     return EINPROGRESS;
484
485   // If we fail, release the dest addr resource
486   if (err != 0) {
487     ConnectFailed_Locked();
488     return err;
489   }
490
491   // Make sure the connection succeeded.
492   if (last_errno_ != 0) {
493     ConnectFailed_Locked();
494     return last_errno_;
495   }
496
497   ConnectDone_Locked();
498   return 0;
499 }
500
501 Error TcpNode::Shutdown(int how) {
502   AUTO_LOCK(node_lock_);
503   if (!IsConnected())
504     return ENOTCONN;
505
506   {
507     AUTO_LOCK(emitter_->GetLock());
508     emitter_->SetError_Locked();
509   }
510   return 0;
511 }
512
513 void TcpNode::ConnectDone_Locked() {
514   local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
515
516   // Now that we are connected, we can start sending and receiving.
517   ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
518   SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
519
520   emitter_->ConnectDone_Locked();
521
522   // The NODELAY option cannot be set in PPAPI before the socket
523   // is connected, but setsockopt() might have already set it.
524   SetNoDelay_Locked();
525
526   // Begin the input pump
527   QueueInput();
528 }
529
530 void TcpNode::ConnectFailed_Locked() {
531   filesystem_->ppapi()->ReleaseResource(remote_addr_);
532   remote_addr_ = 0;
533 }
534
535 Error TcpNode::Listen(int backlog) {
536   AUTO_LOCK(node_lock_);
537   if (!IsBound())
538     return EINVAL;
539
540   int err = TCPInterface()->Listen(
541       socket_resource_, backlog, PP_BlockUntilComplete());
542   if (err != PP_OK)
543     return PPErrorToErrno(err);
544
545   ClearStreamFlags(SSF_CAN_CONNECT);
546   SetStreamFlags(SSF_LISTENING);
547   emitter_->SetListening_Locked();
548   QueueAccept();
549   return 0;
550 }
551
552 Error TcpNode::Recv_Locked(void* buf,
553                            size_t len,
554                            PP_Resource* out_addr,
555                            int* out_len) {
556   assert(emitter_.get());
557   *out_len = emitter_->ReadIn_Locked((char*)buf, len);
558   *out_addr = remote_addr_;
559
560   // Ref the address copy we pass back.
561   filesystem_->ppapi()->AddRefResource(remote_addr_);
562   return 0;
563 }
564
565 // TCP ignores dst addr passed to send_to, and always uses bound address
566 Error TcpNode::Send_Locked(const void* buf,
567                            size_t len,
568                            PP_Resource,
569                            int* out_len) {
570   assert(emitter_.get());
571   if (emitter_->GetError_Locked())
572     return EPIPE;
573   *out_len = emitter_->WriteOut_Locked((char*)buf, len);
574   return 0;
575 }
576
577 }  // namespace nacl_io
578
579 #endif  // PROVIDES_SOCKET_API