Update To 11.40.268.0
[platform/framework/web/crosswalk.git] / src / remoting / protocol / channel_multiplexer.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "remoting/protocol/channel_multiplexer.h"
6
7 #include <string.h>
8
9 #include "base/bind.h"
10 #include "base/callback.h"
11 #include "base/location.h"
12 #include "base/single_thread_task_runner.h"
13 #include "base/stl_util.h"
14 #include "base/thread_task_runner_handle.h"
15 #include "net/base/net_errors.h"
16 #include "net/socket/stream_socket.h"
17 #include "remoting/protocol/message_serialization.h"
18
19 namespace remoting {
20 namespace protocol {
21
22 namespace {
23 const int kChannelIdUnknown = -1;
24 const int kMaxPacketSize = 1024;
25
26 class PendingPacket {
27  public:
28   PendingPacket(scoped_ptr<MultiplexPacket> packet,
29                 const base::Closure& done_task)
30       : packet(packet.Pass()),
31         done_task(done_task),
32         pos(0U) {
33   }
34   ~PendingPacket() {
35     done_task.Run();
36   }
37
38   bool is_empty() { return pos >= packet->data().size(); }
39
40   int Read(char* buffer, size_t size) {
41     size = std::min(size, packet->data().size() - pos);
42     memcpy(buffer, packet->data().data() + pos, size);
43     pos += size;
44     return size;
45   }
46
47  private:
48   scoped_ptr<MultiplexPacket> packet;
49   base::Closure done_task;
50   size_t pos;
51
52   DISALLOW_COPY_AND_ASSIGN(PendingPacket);
53 };
54
55 }  // namespace
56
57 const char ChannelMultiplexer::kMuxChannelName[] = "mux";
58
59 struct ChannelMultiplexer::PendingChannel {
60   PendingChannel(const std::string& name,
61                  const ChannelCreatedCallback& callback)
62       : name(name), callback(callback) {
63   }
64   std::string name;
65   ChannelCreatedCallback callback;
66 };
67
68 class ChannelMultiplexer::MuxChannel {
69  public:
70   MuxChannel(ChannelMultiplexer* multiplexer, const std::string& name,
71              int send_id);
72   ~MuxChannel();
73
74   const std::string& name() { return name_; }
75   int receive_id() { return receive_id_; }
76   void set_receive_id(int id) { receive_id_ = id; }
77
78   // Called by ChannelMultiplexer.
79   scoped_ptr<net::StreamSocket> CreateSocket();
80   void OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
81                         const base::Closure& done_task);
82   void OnWriteFailed();
83
84   // Called by MuxSocket.
85   void OnSocketDestroyed();
86   bool DoWrite(scoped_ptr<MultiplexPacket> packet,
87                const base::Closure& done_task);
88   int DoRead(net::IOBuffer* buffer, int buffer_len);
89
90  private:
91   ChannelMultiplexer* multiplexer_;
92   std::string name_;
93   int send_id_;
94   bool id_sent_;
95   int receive_id_;
96   MuxSocket* socket_;
97   std::list<PendingPacket*> pending_packets_;
98
99   DISALLOW_COPY_AND_ASSIGN(MuxChannel);
100 };
101
102 class ChannelMultiplexer::MuxSocket : public net::StreamSocket,
103                                       public base::NonThreadSafe,
104                                       public base::SupportsWeakPtr<MuxSocket> {
105  public:
106   MuxSocket(MuxChannel* channel);
107   ~MuxSocket() override;
108
109   void OnWriteComplete();
110   void OnWriteFailed();
111   void OnPacketReceived();
112
113   // net::StreamSocket interface.
114   int Read(net::IOBuffer* buffer,
115            int buffer_len,
116            const net::CompletionCallback& callback) override;
117   int Write(net::IOBuffer* buffer,
118             int buffer_len,
119             const net::CompletionCallback& callback) override;
120
121   int SetReceiveBufferSize(int32 size) override {
122     NOTIMPLEMENTED();
123     return net::ERR_NOT_IMPLEMENTED;
124   }
125   int SetSendBufferSize(int32 size) override {
126     NOTIMPLEMENTED();
127     return net::ERR_NOT_IMPLEMENTED;
128   }
129
130   int Connect(const net::CompletionCallback& callback) override {
131     NOTIMPLEMENTED();
132     return net::ERR_NOT_IMPLEMENTED;
133   }
134   void Disconnect() override { NOTIMPLEMENTED(); }
135   bool IsConnected() const override {
136     NOTIMPLEMENTED();
137     return true;
138   }
139   bool IsConnectedAndIdle() const override {
140     NOTIMPLEMENTED();
141     return false;
142   }
143   int GetPeerAddress(net::IPEndPoint* address) const override {
144     NOTIMPLEMENTED();
145     return net::ERR_NOT_IMPLEMENTED;
146   }
147   int GetLocalAddress(net::IPEndPoint* address) const override {
148     NOTIMPLEMENTED();
149     return net::ERR_NOT_IMPLEMENTED;
150   }
151   const net::BoundNetLog& NetLog() const override {
152     NOTIMPLEMENTED();
153     return net_log_;
154   }
155   void SetSubresourceSpeculation() override { NOTIMPLEMENTED(); }
156   void SetOmniboxSpeculation() override { NOTIMPLEMENTED(); }
157   bool WasEverUsed() const override { return true; }
158   bool UsingTCPFastOpen() const override { return false; }
159   bool WasNpnNegotiated() const override { return false; }
160   net::NextProto GetNegotiatedProtocol() const override {
161     return net::kProtoUnknown;
162   }
163   bool GetSSLInfo(net::SSLInfo* ssl_info) override {
164     NOTIMPLEMENTED();
165     return false;
166   }
167
168  private:
169   MuxChannel* channel_;
170
171   net::CompletionCallback read_callback_;
172   scoped_refptr<net::IOBuffer> read_buffer_;
173   int read_buffer_size_;
174
175   bool write_pending_;
176   int write_result_;
177   net::CompletionCallback write_callback_;
178
179   net::BoundNetLog net_log_;
180
181   DISALLOW_COPY_AND_ASSIGN(MuxSocket);
182 };
183
184
185 ChannelMultiplexer::MuxChannel::MuxChannel(
186     ChannelMultiplexer* multiplexer,
187     const std::string& name,
188     int send_id)
189     : multiplexer_(multiplexer),
190       name_(name),
191       send_id_(send_id),
192       id_sent_(false),
193       receive_id_(kChannelIdUnknown),
194       socket_(NULL) {
195 }
196
197 ChannelMultiplexer::MuxChannel::~MuxChannel() {
198   // Socket must be destroyed before the channel.
199   DCHECK(!socket_);
200   STLDeleteElements(&pending_packets_);
201 }
202
203 scoped_ptr<net::StreamSocket> ChannelMultiplexer::MuxChannel::CreateSocket() {
204   DCHECK(!socket_);  // Can't create more than one socket per channel.
205   scoped_ptr<MuxSocket> result(new MuxSocket(this));
206   socket_ = result.get();
207   return result.Pass();
208 }
209
210 void ChannelMultiplexer::MuxChannel::OnIncomingPacket(
211     scoped_ptr<MultiplexPacket> packet,
212     const base::Closure& done_task) {
213   DCHECK_EQ(packet->channel_id(), receive_id_);
214   if (packet->data().size() > 0) {
215     pending_packets_.push_back(new PendingPacket(packet.Pass(), done_task));
216     if (socket_) {
217       // Notify the socket that we have more data.
218       socket_->OnPacketReceived();
219     }
220   }
221 }
222
223 void ChannelMultiplexer::MuxChannel::OnWriteFailed() {
224   if (socket_)
225     socket_->OnWriteFailed();
226 }
227
228 void ChannelMultiplexer::MuxChannel::OnSocketDestroyed() {
229   DCHECK(socket_);
230   socket_ = NULL;
231 }
232
233 bool ChannelMultiplexer::MuxChannel::DoWrite(
234     scoped_ptr<MultiplexPacket> packet,
235     const base::Closure& done_task) {
236   packet->set_channel_id(send_id_);
237   if (!id_sent_) {
238     packet->set_channel_name(name_);
239     id_sent_ = true;
240   }
241   return multiplexer_->DoWrite(packet.Pass(), done_task);
242 }
243
244 int ChannelMultiplexer::MuxChannel::DoRead(net::IOBuffer* buffer,
245                                            int buffer_len) {
246   int pos = 0;
247   while (buffer_len > 0 && !pending_packets_.empty()) {
248     DCHECK(!pending_packets_.front()->is_empty());
249     int result = pending_packets_.front()->Read(
250         buffer->data() + pos, buffer_len);
251     DCHECK_LE(result, buffer_len);
252     pos += result;
253     buffer_len -= pos;
254     if (pending_packets_.front()->is_empty()) {
255       delete pending_packets_.front();
256       pending_packets_.erase(pending_packets_.begin());
257     }
258   }
259   return pos;
260 }
261
262 ChannelMultiplexer::MuxSocket::MuxSocket(MuxChannel* channel)
263     : channel_(channel),
264       read_buffer_size_(0),
265       write_pending_(false),
266       write_result_(0) {
267 }
268
269 ChannelMultiplexer::MuxSocket::~MuxSocket() {
270   channel_->OnSocketDestroyed();
271 }
272
273 int ChannelMultiplexer::MuxSocket::Read(
274     net::IOBuffer* buffer, int buffer_len,
275     const net::CompletionCallback& callback) {
276   DCHECK(CalledOnValidThread());
277   DCHECK(read_callback_.is_null());
278
279   int result = channel_->DoRead(buffer, buffer_len);
280   if (result == 0) {
281     read_buffer_ = buffer;
282     read_buffer_size_ = buffer_len;
283     read_callback_ = callback;
284     return net::ERR_IO_PENDING;
285   }
286   return result;
287 }
288
289 int ChannelMultiplexer::MuxSocket::Write(
290     net::IOBuffer* buffer, int buffer_len,
291     const net::CompletionCallback& callback) {
292   DCHECK(CalledOnValidThread());
293
294   scoped_ptr<MultiplexPacket> packet(new MultiplexPacket());
295   size_t size = std::min(kMaxPacketSize, buffer_len);
296   packet->mutable_data()->assign(buffer->data(), size);
297
298   write_pending_ = true;
299   bool result = channel_->DoWrite(packet.Pass(), base::Bind(
300       &ChannelMultiplexer::MuxSocket::OnWriteComplete, AsWeakPtr()));
301
302   if (!result) {
303     // Cannot complete the write, e.g. if the connection has been terminated.
304     return net::ERR_FAILED;
305   }
306
307   // OnWriteComplete() might be called above synchronously.
308   if (write_pending_) {
309     DCHECK(write_callback_.is_null());
310     write_callback_ = callback;
311     write_result_ = size;
312     return net::ERR_IO_PENDING;
313   }
314
315   return size;
316 }
317
318 void ChannelMultiplexer::MuxSocket::OnWriteComplete() {
319   write_pending_ = false;
320   if (!write_callback_.is_null()) {
321     net::CompletionCallback cb;
322     std::swap(cb, write_callback_);
323     cb.Run(write_result_);
324   }
325 }
326
327 void ChannelMultiplexer::MuxSocket::OnWriteFailed() {
328   if (!write_callback_.is_null()) {
329     net::CompletionCallback cb;
330     std::swap(cb, write_callback_);
331     cb.Run(net::ERR_FAILED);
332   }
333 }
334
335 void ChannelMultiplexer::MuxSocket::OnPacketReceived() {
336   if (!read_callback_.is_null()) {
337     int result = channel_->DoRead(read_buffer_.get(), read_buffer_size_);
338     read_buffer_ = NULL;
339     DCHECK_GT(result, 0);
340     net::CompletionCallback cb;
341     std::swap(cb, read_callback_);
342     cb.Run(result);
343   }
344 }
345
346 ChannelMultiplexer::ChannelMultiplexer(StreamChannelFactory* factory,
347                                        const std::string& base_channel_name)
348     : base_channel_factory_(factory),
349       base_channel_name_(base_channel_name),
350       next_channel_id_(0),
351       weak_factory_(this) {
352 }
353
354 ChannelMultiplexer::~ChannelMultiplexer() {
355   DCHECK(pending_channels_.empty());
356   STLDeleteValues(&channels_);
357
358   // Cancel creation of the base channel if it hasn't finished.
359   if (base_channel_factory_)
360     base_channel_factory_->CancelChannelCreation(base_channel_name_);
361 }
362
363 void ChannelMultiplexer::CreateChannel(const std::string& name,
364                                        const ChannelCreatedCallback& callback) {
365   if (base_channel_.get()) {
366     // Already have |base_channel_|. Create new multiplexed channel
367     // synchronously.
368     callback.Run(GetOrCreateChannel(name)->CreateSocket());
369   } else if (!base_channel_.get() && !base_channel_factory_) {
370     // Fail synchronously if we failed to create |base_channel_|.
371     callback.Run(nullptr);
372   } else {
373     // Still waiting for the |base_channel_|.
374     pending_channels_.push_back(PendingChannel(name, callback));
375
376     // If this is the first multiplexed channel then create the base channel.
377     if (pending_channels_.size() == 1U) {
378       base_channel_factory_->CreateChannel(
379           base_channel_name_,
380           base::Bind(&ChannelMultiplexer::OnBaseChannelReady,
381                      base::Unretained(this)));
382     }
383   }
384 }
385
386 void ChannelMultiplexer::CancelChannelCreation(const std::string& name) {
387   for (std::list<PendingChannel>::iterator it = pending_channels_.begin();
388        it != pending_channels_.end(); ++it) {
389     if (it->name == name) {
390       pending_channels_.erase(it);
391       return;
392     }
393   }
394 }
395
396 void ChannelMultiplexer::OnBaseChannelReady(
397     scoped_ptr<net::StreamSocket> socket) {
398   base_channel_factory_ = NULL;
399   base_channel_ = socket.Pass();
400
401   if (base_channel_.get()) {
402     // Initialize reader and writer.
403     reader_.Init(base_channel_.get(),
404                  base::Bind(&ChannelMultiplexer::OnIncomingPacket,
405                             base::Unretained(this)));
406     writer_.Init(base_channel_.get(),
407                  base::Bind(&ChannelMultiplexer::OnWriteFailed,
408                             base::Unretained(this)));
409   }
410
411   DoCreatePendingChannels();
412 }
413
414 void ChannelMultiplexer::DoCreatePendingChannels() {
415   if (pending_channels_.empty())
416     return;
417
418   // Every time this function is called it connects a single channel and posts a
419   // separate task to connect other channels. This is necessary because the
420   // callback may destroy the multiplexer or somehow else modify
421   // |pending_channels_| list (e.g. call CancelChannelCreation()).
422   base::ThreadTaskRunnerHandle::Get()->PostTask(
423       FROM_HERE, base::Bind(&ChannelMultiplexer::DoCreatePendingChannels,
424                             weak_factory_.GetWeakPtr()));
425
426   PendingChannel c = pending_channels_.front();
427   pending_channels_.erase(pending_channels_.begin());
428   scoped_ptr<net::StreamSocket> socket;
429   if (base_channel_.get())
430     socket = GetOrCreateChannel(c.name)->CreateSocket();
431   c.callback.Run(socket.Pass());
432 }
433
434 ChannelMultiplexer::MuxChannel* ChannelMultiplexer::GetOrCreateChannel(
435     const std::string& name) {
436   // Check if we already have a channel with the requested name.
437   std::map<std::string, MuxChannel*>::iterator it = channels_.find(name);
438   if (it != channels_.end())
439     return it->second;
440
441   // Create a new channel if we haven't found existing one.
442   MuxChannel* channel = new MuxChannel(this, name, next_channel_id_);
443   ++next_channel_id_;
444   channels_[channel->name()] = channel;
445   return channel;
446 }
447
448
449 void ChannelMultiplexer::OnWriteFailed(int error) {
450   for (std::map<std::string, MuxChannel*>::iterator it = channels_.begin();
451        it != channels_.end(); ++it) {
452     base::ThreadTaskRunnerHandle::Get()->PostTask(
453         FROM_HERE, base::Bind(&ChannelMultiplexer::NotifyWriteFailed,
454                               weak_factory_.GetWeakPtr(), it->second->name()));
455   }
456 }
457
458 void ChannelMultiplexer::NotifyWriteFailed(const std::string& name) {
459   std::map<std::string, MuxChannel*>::iterator it = channels_.find(name);
460   if (it != channels_.end()) {
461     it->second->OnWriteFailed();
462   }
463 }
464
465 void ChannelMultiplexer::OnIncomingPacket(scoped_ptr<MultiplexPacket> packet,
466                                           const base::Closure& done_task) {
467   DCHECK(packet->has_channel_id());
468   if (!packet->has_channel_id()) {
469     LOG(ERROR) << "Received packet without channel_id.";
470     done_task.Run();
471     return;
472   }
473
474   int receive_id = packet->channel_id();
475   MuxChannel* channel = NULL;
476   std::map<int, MuxChannel*>::iterator it =
477       channels_by_receive_id_.find(receive_id);
478   if (it != channels_by_receive_id_.end()) {
479     channel = it->second;
480   } else {
481     // This is a new |channel_id| we haven't seen before. Look it up by name.
482     if (!packet->has_channel_name()) {
483       LOG(ERROR) << "Received packet with unknown channel_id and "
484           "without channel_name.";
485       done_task.Run();
486       return;
487     }
488     channel = GetOrCreateChannel(packet->channel_name());
489     channel->set_receive_id(receive_id);
490     channels_by_receive_id_[receive_id] = channel;
491   }
492
493   channel->OnIncomingPacket(packet.Pass(), done_task);
494 }
495
496 bool ChannelMultiplexer::DoWrite(scoped_ptr<MultiplexPacket> packet,
497                                  const base::Closure& done_task) {
498   return writer_.Write(SerializeAndFrameMessage(*packet), done_task);
499 }
500
501 }  // namespace protocol
502 }  // namespace remoting