1 // Copyright (c) 2013 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
6 #include "nacl_io/mount_node_udp.h"
13 #include "nacl_io/event_emitter_udp.h"
14 #include "nacl_io/mount_stream.h"
15 #include "nacl_io/packet.h"
16 #include "nacl_io/pepper_interface.h"
19 const size_t kMaxPacketSize = 65536;
20 const size_t kDefaultFifoSize = kMaxPacketSize * 8;
25 class UDPWork : public MountStream::Work {
27 explicit UDPWork(const ScopedEventEmitterUDP& emitter)
28 : MountStream::Work(emitter->stream()->mount_stream()),
37 UDPSocketInterface* UDPInterface() {
38 return mount()->ppapi()->GetUDPSocketInterface();
42 ScopedEventEmitterUDP emitter_;
46 class UDPSendWork : public UDPWork {
48 explicit UDPSendWork(const ScopedEventEmitterUDP& emitter,
49 const ScopedMountNodeSocket& node)
50 : UDPWork(emitter), node_(node) {}
52 virtual bool Start(int32_t val) {
53 AUTO_LOCK(emitter_->GetLock());
55 // Does the stream exist, and can it send?
56 if (!node_->TestStreamFlags(SSF_CAN_SEND))
59 packet_ = emitter_->ReadTXPacket_Locked();
63 int err = UDPInterface()->SendTo(node_->socket_resource(),
67 mount()->GetRunCompletion(this));
68 if (err != PP_OK_COMPLETIONPENDING) {
69 // Anything else, we should assume the socket has gone bad.
70 node_->SetError_Locked(err);
74 node_->SetStreamFlags(SSF_SENDING);
78 virtual void Run(int32_t length_error) {
79 AUTO_LOCK(emitter_->GetLock());
81 if (length_error < 0) {
82 node_->SetError_Locked(length_error);
86 // If we did send, then Q more work.
87 node_->ClearStreamFlags(SSF_SENDING);
92 // We assume that transmits will always complete. If the upstream
93 // actually back pressures, enough to prevent the Send callback
94 // from triggering, this resource may never go away.
95 ScopedMountNodeSocket node_;
99 class UDPRecvWork : public UDPWork {
101 explicit UDPRecvWork(const ScopedEventEmitterUDP& emitter)
103 data_ = new char[kMaxPacketSize];
110 virtual bool Start(int32_t val) {
111 AUTO_LOCK(emitter_->GetLock());
112 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
114 // Does the stream exist, and can it recv?
115 if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
118 // Check if we are already receiving.
119 if (stream->TestStreamFlags(SSF_RECVING))
122 stream->SetStreamFlags(SSF_RECVING);
123 int err = UDPInterface()->RecvFrom(stream->socket_resource(),
127 mount()->GetRunCompletion(this));
128 if (err != PP_OK_COMPLETIONPENDING) {
129 stream->SetError_Locked(err);
136 virtual void Run(int32_t length_error) {
137 AUTO_LOCK(emitter_->GetLock());
138 MountNodeUDP* stream = static_cast<MountNodeUDP*>(emitter_->stream());
142 // On successful receive we queue more input
143 if (length_error > 0) {
144 Packet* packet = new Packet(mount()->ppapi());
145 packet->Copy(data_, length_error, addr_);
146 emitter_->WriteRXPacket_Locked(packet);
147 stream->ClearStreamFlags(SSF_RECVING);
148 stream->QueueInput();
150 stream->SetError_Locked(length_error);
160 MountNodeUDP::MountNodeUDP(Mount* mount)
161 : MountNodeSocket(mount),
162 emitter_(new EventEmitterUDP(kDefaultFifoSize, kDefaultFifoSize)) {
163 emitter_->AttachStream(this);
166 void MountNodeUDP::Destroy() {
167 emitter_->DetachStream();
168 MountNodeSocket::Destroy();
171 EventEmitterUDP* MountNodeUDP::GetEventEmitter() {
172 return emitter_.get();
175 Error MountNodeUDP::Init(int open_flags) {
176 Error err = MountNodeSocket::Init(open_flags);
180 if (UDPInterface() == NULL)
183 socket_resource_ = UDPInterface()->Create(mount_->ppapi()->GetInstance());
184 if (0 == socket_resource_)
190 void MountNodeUDP::QueueInput() {
191 UDPRecvWork* work = new UDPRecvWork(emitter_);
192 mount_stream()->EnqueueWork(work);
195 void MountNodeUDP::QueueOutput() {
196 if (!TestStreamFlags(SSF_CAN_SEND))
199 if (TestStreamFlags(SSF_SENDING))
202 UDPSendWork* work = new UDPSendWork(emitter_, ScopedMountNodeSocket(this));
203 mount_stream()->EnqueueWork(work);
206 Error MountNodeUDP::Bind(const struct sockaddr* addr, socklen_t len) {
207 if (0 == socket_resource_)
210 /* Only bind once. */
214 PP_Resource out_addr = SockAddrToResource(addr, len);
218 int err = UDPInterface()->Bind(socket_resource_,
220 PP_BlockUntilComplete());
222 mount_->ppapi()->ReleaseResource(out_addr);
223 return PPErrorToErrno(err);
226 // Now that we are bound, we can start sending and receiving.
227 SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
230 local_addr_ = out_addr;
234 Error MountNodeUDP::Connect(const HandleAttr& attr,
235 const struct sockaddr* addr,
237 if (0 == socket_resource_)
240 /* Connect for UDP is the default dest, it's legal to change it. */
241 if (remote_addr_ != 0) {
242 mount_->ppapi()->ReleaseResource(remote_addr_);
246 remote_addr_ = SockAddrToResource(addr, len);
247 if (0 == remote_addr_)
253 Error MountNodeUDP::Recv_Locked(void* buf,
255 PP_Resource* out_addr,
257 Packet* packet = emitter_->ReadRXPacket_Locked();
263 static_cast<int32_t>(std::min<int>(len, packet->len()));
264 memcpy(buf, packet->buffer(), capped_len);
266 if (packet->addr() != 0) {
267 mount_->ppapi()->AddRefResource(packet->addr());
268 *out_addr = packet->addr();
271 *out_len = capped_len;
276 // Should never happen, Recv_Locked should not be called
277 // unless already in a POLLIN state.
281 Error MountNodeUDP::Send_Locked(const void* buf,
287 static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
288 Packet* packet = new Packet(mount_->ppapi());
289 packet->Copy(buf, capped_len, addr);
291 emitter_->WriteTXPacket_Locked(packet);
292 *out_len = capped_len;
296 } // namespace nacl_io