Upstream version 11.40.271.0
[platform/framework/web/crosswalk.git] / src / mojo / services / public / cpp / network / udp_socket_wrapper.cc
1 // Copyright 2014 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "mojo/services/public/cpp/network/udp_socket_wrapper.h"
6
7 #include <assert.h>
8
9 #include "mojo/public/cpp/environment/logging.h"
10
11 namespace mojo {
12 namespace {
13
14 const uint32_t kDefaultReceiveQueueSlots = 32;
15
16 }  // namespace
17
18 UDPSocketWrapper::NegotiateCallbackHandler::NegotiateCallbackHandler(
19     UDPSocketWrapper* delegate)
20     : delegate_(delegate) {
21 }
22
23 UDPSocketWrapper::NegotiateCallbackHandler::~NegotiateCallbackHandler() {}
24
25 void UDPSocketWrapper::NegotiateCallbackHandler::Run(
26     uint32_t actual_size) const {
27   delegate_->OnNegotiateMaxPendingSendRequestsCompleted(actual_size);
28 }
29
30 UDPSocketWrapper::SendCallbackHandler::SendCallbackHandler(
31     UDPSocketWrapper* delegate,
32     const ErrorCallback& forward_callback)
33     : delegate_(delegate),
34       forward_callback_(forward_callback) {
35 }
36
37 UDPSocketWrapper::SendCallbackHandler::~SendCallbackHandler() {}
38
39 void UDPSocketWrapper::SendCallbackHandler::Run(NetworkErrorPtr result) const {
40   delegate_->OnSendToCompleted(result.Pass(), forward_callback_);
41 }
42
43 UDPSocketWrapper::ReceivedData::ReceivedData() {}
44 UDPSocketWrapper::ReceivedData::~ReceivedData() {}
45
46 UDPSocketWrapper::SendRequest::SendRequest() {}
47 UDPSocketWrapper::SendRequest::~SendRequest() {}
48
49 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket)
50   : socket_(socket.Pass()),
51     max_receive_queue_size_(kDefaultReceiveQueueSlots),
52     max_pending_sends_(1),
53     current_pending_sends_(0) {
54   Initialize(0);
55 }
56
57 UDPSocketWrapper::UDPSocketWrapper(UDPSocketPtr socket,
58                                    uint32_t receive_queue_slots,
59                                    uint32_t requested_max_pending_sends)
60   : socket_(socket.Pass()),
61     max_receive_queue_size_(receive_queue_slots),
62     max_pending_sends_(1),
63     current_pending_sends_(0) {
64   Initialize(requested_max_pending_sends);
65 }
66
67 UDPSocketWrapper::~UDPSocketWrapper() {
68   while (!receive_queue_.empty()) {
69     delete receive_queue_.front();
70     receive_queue_.pop();
71   }
72   while (!send_requests_.empty()) {
73     delete send_requests_.front();
74     send_requests_.pop();
75   }
76 }
77
78 void UDPSocketWrapper::AllowAddressReuse(const ErrorCallback& callback) {
79   socket_->AllowAddressReuse(callback);
80 }
81
82 void UDPSocketWrapper::Bind(
83     NetAddressPtr addr,
84     const Callback<void(NetworkErrorPtr, NetAddressPtr)>& callback) {
85   socket_->Bind(addr.Pass(), callback);
86 }
87
88 void UDPSocketWrapper::SetSendBufferSize(uint32_t size,
89                                          const ErrorCallback& callback) {
90   socket_->SetSendBufferSize(size, callback);
91 }
92
93 void UDPSocketWrapper::SetReceiveBufferSize(uint32_t size,
94                                             const ErrorCallback& callback) {
95   socket_->SetReceiveBufferSize(size, callback);
96 }
97
98 bool UDPSocketWrapper::ReceiveFrom(const ReceiveCallback& callback) {
99   if (receive_queue_.empty()) {
100     receive_requests_.push(callback);
101     return false;
102   }
103
104   ReceivedData* data = receive_queue_.front();
105   receive_queue_.pop();
106   socket_->ReceiveMore(1);
107   callback.Run(data->result.Pass(), data->src_addr.Pass(), data->data.Pass());
108   delete data;
109   return true;
110 }
111
112 void UDPSocketWrapper::SendTo(NetAddressPtr dest_addr,
113                               Array<uint8_t> data,
114                               const ErrorCallback& callback) {
115   if (current_pending_sends_ >= max_pending_sends_) {
116     SendRequest* request = new SendRequest();
117     request->dest_addr = dest_addr.Pass();
118     request->data = data.Pass();
119     request->callback = callback;
120     send_requests_.push(request);
121     return;
122   }
123
124   MOJO_DCHECK(send_requests_.empty());
125   current_pending_sends_++;
126   socket_->SendTo(dest_addr.Pass(), data.Pass(),
127                   ErrorCallback(static_cast<ErrorCallback::Runnable*>(
128                       new SendCallbackHandler(this, callback))));
129 }
130
131 void UDPSocketWrapper::OnReceived(NetworkErrorPtr result,
132                                   NetAddressPtr src_addr,
133                                   Array<uint8_t> data) {
134   if (!receive_requests_.empty()) {
135     // The cache should be empty if there are user requests waiting for data.
136     MOJO_DCHECK(receive_queue_.empty());
137
138     socket_->ReceiveMore(1);
139
140     ReceiveCallback callback = receive_requests_.front();
141     receive_requests_.pop();
142
143     callback.Run(result.Pass(), src_addr.Pass(), data.Pass());
144     return;
145   }
146
147   MOJO_DCHECK(receive_queue_.size() < max_receive_queue_size_);
148   ReceivedData* received_data = new ReceivedData();
149   received_data->result = result.Pass();
150   received_data->src_addr = src_addr.Pass();
151   received_data->data = data.Pass();
152   receive_queue_.push(received_data);
153 }
154
155 void UDPSocketWrapper::Initialize(uint32_t requested_max_pending_sends) {
156   socket_.set_client(this);
157   socket_->NegotiateMaxPendingSendRequests(
158       requested_max_pending_sends,
159       Callback<void(uint32_t)>(
160           static_cast< Callback<void(uint32_t)>::Runnable*>(
161               new NegotiateCallbackHandler(this))));
162   socket_->ReceiveMore(max_receive_queue_size_);
163 }
164
165 void UDPSocketWrapper::OnNegotiateMaxPendingSendRequestsCompleted(
166     uint32_t actual_size) {
167   MOJO_DCHECK(max_pending_sends_ == 1);
168
169   if (actual_size == 0) {
170     assert(false);
171     return;
172   }
173
174   max_pending_sends_ = actual_size;
175
176   while (ProcessNextSendRequest());
177 }
178
179 void UDPSocketWrapper::OnSendToCompleted(
180     NetworkErrorPtr result,
181     const ErrorCallback& forward_callback) {
182   current_pending_sends_--;
183   ProcessNextSendRequest();
184
185   forward_callback.Run(result.Pass());
186 }
187
188 bool UDPSocketWrapper::ProcessNextSendRequest() {
189   if (current_pending_sends_ >= max_pending_sends_ || send_requests_.empty())
190     return false;
191
192   SendRequest* request = send_requests_.front();
193   send_requests_.pop();
194
195   current_pending_sends_++;
196
197   socket_->SendTo(
198       request->dest_addr.Pass(), request->data.Pass(),
199       ErrorCallback(static_cast<ErrorCallback::Runnable*>(
200           new SendCallbackHandler(this, request->callback))));
201
202   delete request;
203
204   return true;
205 }
206
207 }  // namespace mojo