- add sources.
[platform/framework/web/crosswalk.git] / src / native_client_sdk / src / libraries / nacl_io / mount_node_udp.cc
1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5
6 #include "nacl_io/mount_node_udp.h"
7
8 #include <errno.h>
9 #include <string.h>
10
11 #include <algorithm>
12
13 #include "nacl_io/event_emitter_udp.h"
14 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/packet.h"
16 #include "nacl_io/pepper_interface.h"
17
18 namespace {
19   const size_t kMaxPacketSize = 65536;
20   const size_t kDefaultFifoSize = kMaxPacketSize * 8;
21 }
22
23 namespace nacl_io {
24
25 class UDPWork : public MountStream::Work {
26  public:
27   explicit UDPWork(const ScopedEventEmitterUDP& emitter)
28       : MountStream::Work(emitter->stream()->mount_stream()),
29         emitter_(emitter),
30         packet_(NULL) {
31   }
32
33   ~UDPWork() {
34     delete packet_;
35   }
36
37   UDPSocketInterface* UDPInterface() {
38     return mount()->ppapi()->GetUDPSocketInterface();
39   }
40
41  protected:
42   ScopedEventEmitterUDP emitter_;
43   Packet* packet_;
44 };
45
46 class UDPSendWork : public UDPWork {
47  public:
48   explicit UDPSendWork(const ScopedEventEmitterUDP& emitter,
49                        const ScopedMountNodeSocket& node)
50       : UDPWork(emitter), node_(node) {}
51
52   virtual bool Start(int32_t val) {
53     AUTO_LOCK(emitter_->GetLock());
54
55     // Does the stream exist, and can it send?
56     if (!node_->TestStreamFlags(SSF_CAN_SEND))
57       return false;
58
59     packet_ = emitter_->ReadTXPacket_Locked();
60     if (NULL == packet_)
61       return false;
62
63     int err = UDPInterface()->SendTo(node_->socket_resource(),
64                                      packet_->buffer(),
65                                      packet_->len(),
66                                      packet_->addr(),
67                                      mount()->GetRunCompletion(this));
68     if (err != PP_OK_COMPLETIONPENDING) {
69       // Anything else, we should assume the socket has gone bad.
70       node_->SetError_Locked(err);
71       return false;
72     }
73
74     node_->SetStreamFlags(SSF_SENDING);
75     return true;
76   }
77
78   virtual void Run(int32_t length_error) {
79     AUTO_LOCK(emitter_->GetLock());
80
81     if (length_error < 0) {
82       node_->SetError_Locked(length_error);
83       return;
84     }
85
86     // If we did send, then Q more work.
87     node_->ClearStreamFlags(SSF_SENDING);
88     node_->QueueOutput();
89   }
90
91  private:
92   // We assume that transmits will always complete.  If the upstream
93   // actually back pressures, enough to prevent the Send callback
94   // from triggering, this resource may never go away.
95   ScopedMountNodeSocket node_;
96 };
97
98
99 class UDPRecvWork : public UDPWork {
100  public:
101   explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
102       : UDPWork(emitter) {
103     data_ = new char[kMaxPacketSize];
104   }
105
106   ~UDPRecvWork() {
107     delete[] data_;
108   }
109
110   virtual bool Start(int32_t val) {
111     AUTO_LOCK(emitter_->GetLock());
112     MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
113
114     // Does the stream exist, and can it recv?
115     if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
116       return false;
117
118     // Check if we are already receiving.
119     if (stream->TestStreamFlags(SSF_RECVING))
120       return false;
121
122     stream->SetStreamFlags(SSF_RECVING);
123     int err = UDPInterface()->RecvFrom(stream->socket_resource(),
124                                        data_,
125                                        kMaxPacketSize,
126                                        &addr_,
127                                        mount()->GetRunCompletion(this));
128     if (err != PP_OK_COMPLETIONPENDING) {
129       stream->SetError_Locked(err);
130       return false;
131     }
132
133     return true;
134   }
135
136   virtual void Run(int32_t length_error) {
137     AUTO_LOCK(emitter_->GetLock());
138     MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
139     if (NULL == stream)
140       return;
141
142     // On successful receive we queue more input
143     if (length_error > 0) {
144       Packet* packet = new Packet(mount()->ppapi());
145       packet->Copy(data_, length_error, addr_);
146       emitter_->WriteRXPacket_Locked(packet);
147       stream->ClearStreamFlags(SSF_RECVING);
148       stream->QueueInput();
149     } else {
150       stream->SetError_Locked(length_error);
151     }
152   }
153
154  private:
155   char* data_;
156   PP_Resource addr_;
157 };
158
159
160 MountNodeUDP::MountNodeUDP(Mount* mount)
161     : MountNodeSocket(mount),
162       emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
163   emitter_->AttachStream(this);
164 }
165
166 void MountNodeUDP::Destroy() {
167   emitter_->DetachStream();
168   MountNodeSocket::Destroy();
169 }
170
171 EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
172   return emitter_.get();
173 }
174
175 Error MountNodeUDP::Init(int open_flags) {
176   Error err = MountNodeSocket::Init(open_flags);
177   if (err != 0)
178     return err;
179
180   if (UDPInterface() == NULL)
181     return EACCES;
182
183   socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
184   if (0 == socket_resource_)
185     return EACCES;
186
187   return 0;
188 }
189
190 void MountNodeUDP::QueueInput() {
191   UDPRecvWork* work = new UDPRecvWork(emitter_);
192   mount_stream()->EnqueueWork(work);
193 }
194
195 void MountNodeUDP::QueueOutput() {
196   if (!TestStreamFlags(SSF_CAN_SEND))
197     return;
198
199   if (TestStreamFlags(SSF_SENDING))
200     return;
201
202   UDPSendWork* work = new UDPSendWork(emitter_, ScopedMountNodeSocket(this));
203   mount_stream()->EnqueueWork(work);
204 }
205
206 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
207   if (0 == socket_resource_)
208     return EBADF;
209
210   /* Only bind once. */
211   if (IsBound())
212     return EINVAL;
213
214   PP_Resource out_addr = SockAddrToResource(addr, len);
215   if (0 == out_addr)
216     return EINVAL;
217
218   int err = UDPInterface()->Bind(socket_resource_,
219                                  out_addr,
220                                  PP_BlockUntilComplete());
221   if (err != 0) {
222     mount_->ppapi()->ReleaseResource(out_addr);
223     return PPErrorToErrno(err);
224   }
225
226   // Now that we are bound, we can start sending and receiving.
227   SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
228   QueueInput();
229
230   local_addr_ = out_addr;
231   return 0;
232 }
233
234 Error MountNodeUDP::Connect(const HandleAttr& attr,
235                             const struct sockaddr* addr,
236                             socklen_t len) {
237   if (0 == socket_resource_)
238     return EBADF;
239
240   /* Connect for UDP is the default dest, it's legal to change it. */
241   if (remote_addr_ != 0) {
242     mount_->ppapi()->ReleaseResource(remote_addr_);
243     remote_addr_ = 0;
244   }
245
246   remote_addr_ = SockAddrToResource(addr, len);
247   if (0 == remote_addr_)
248     return EINVAL;
249
250   return 0;
251 }
252
253 Error MountNodeUDP::Recv_Locked(void* buf,
254                                 size_t len,
255                                 PP_Resource* out_addr,
256                                 int* out_len) {
257   Packet* packet = emitter_->ReadRXPacket_Locked();
258   *out_len = 0;
259   *out_addr = 0;
260
261   if (packet) {
262     int capped_len =
263         static_cast<int32_t>(std::min<int>(len, packet->len()));
264     memcpy(buf, packet->buffer(), capped_len);
265
266     if (packet->addr() != 0) {
267       mount_->ppapi()->AddRefResource(packet->addr());
268       *out_addr = packet->addr();
269     }
270
271     *out_len = capped_len;
272     delete packet;
273     return 0;
274   }
275
276   // Should never happen, Recv_Locked should not be called
277   // unless already in a POLLIN state.
278   return EBADF;
279 }
280
281 Error MountNodeUDP::Send_Locked(const void* buf,
282                                 size_t len,
283                                 PP_Resource addr,
284                                 int* out_len) {
285   *out_len = 0;
286   int capped_len =
287       static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
288   Packet* packet = new Packet(mount_->ppapi());
289   packet->Copy(buf, capped_len, addr);
290
291   emitter_->WriteTXPacket_Locked(packet);
292   *out_len = capped_len;
293   return 0;
294 }
295
296 }  // namespace nacl_io
297