Upstream version 9.38.198.0
[platform/framework/web/crosswalk.git] / src / third_party / libjingle / source / talk / session / tunnel / pseudotcpchannel.cc
1 /*
2  * libjingle
3  * Copyright 2004--2006, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #include <string>
29 #include "pseudotcpchannel.h"
30 #include "talk/p2p/base/candidate.h"
31 #include "talk/p2p/base/transportchannel.h"
32 #include "webrtc/base/basictypes.h"
33 #include "webrtc/base/common.h"
34 #include "webrtc/base/logging.h"
35 #include "webrtc/base/scoped_ptr.h"
36 #include "webrtc/base/stringutils.h"
37
38 using namespace rtc;
39
40 namespace cricket {
41
42 extern const rtc::ConstantLabel SESSION_STATES[];
43
44 // MSG_WK_* - worker thread messages
45 // MSG_ST_* - stream thread messages
46 // MSG_SI_* - signal thread messages
47
48 enum {
49   MSG_WK_CLOCK = 1,
50   MSG_WK_PURGE,
51   MSG_ST_EVENT,
52   MSG_SI_DESTROYCHANNEL,
53   MSG_SI_DESTROY,
54 };
55
56 struct EventData : public MessageData {
57   int event, error;
58   EventData(int ev, int err = 0) : event(ev), error(err) { }
59 };
60
61 ///////////////////////////////////////////////////////////////////////////////
62 // PseudoTcpChannel::InternalStream
63 ///////////////////////////////////////////////////////////////////////////////
64
65 class PseudoTcpChannel::InternalStream : public StreamInterface {
66 public:
67   InternalStream(PseudoTcpChannel* parent);
68   virtual ~InternalStream();
69
70   virtual StreamState GetState() const;
71   virtual StreamResult Read(void* buffer, size_t buffer_len,
72                                        size_t* read, int* error);
73   virtual StreamResult Write(const void* data, size_t data_len,
74                                         size_t* written, int* error);
75   virtual void Close();
76
77 private:
78   // parent_ is accessed and modified exclusively on the event thread, to
79   // avoid thread contention.  This means that the PseudoTcpChannel cannot go
80   // away until after it receives a Close() from TunnelStream.
81   PseudoTcpChannel* parent_;
82 };
83
84 ///////////////////////////////////////////////////////////////////////////////
85 // PseudoTcpChannel
86 // Member object lifetime summaries:
87 //   session_ - passed in constructor, cleared when channel_ goes away.
88 //   channel_ - created in Connect, destroyed when session_ or tcp_ goes away.
89 //   tcp_ - created in Connect, destroyed when channel_ goes away, or connection
90 //     closes.
91 //   worker_thread_ - created when channel_ is created, purged when channel_ is
92 //     destroyed.
93 //   stream_ - created in GetStream, destroyed by owner at arbitrary time.
94 //   this - created in constructor, destroyed when worker_thread_ and stream_
95 //     are both gone.
96 ///////////////////////////////////////////////////////////////////////////////
97
98 //
99 // Signal thread methods
100 //
101
102 PseudoTcpChannel::PseudoTcpChannel(Thread* stream_thread, Session* session)
103   : signal_thread_(session->session_manager()->signaling_thread()),
104     worker_thread_(NULL),
105     stream_thread_(stream_thread),
106     session_(session), channel_(NULL), tcp_(NULL), stream_(NULL),
107     stream_readable_(false), pending_read_event_(false),
108     ready_to_connect_(false) {
109   ASSERT(signal_thread_->IsCurrent());
110   ASSERT(NULL != session_);
111 }
112
113 PseudoTcpChannel::~PseudoTcpChannel() {
114   ASSERT(signal_thread_->IsCurrent());
115   ASSERT(worker_thread_ == NULL);
116   ASSERT(session_ == NULL);
117   ASSERT(channel_ == NULL);
118   ASSERT(stream_ == NULL);
119   ASSERT(tcp_ == NULL);
120 }
121
122 bool PseudoTcpChannel::Connect(const std::string& content_name,
123                                const std::string& channel_name,
124                                int component) {
125   ASSERT(signal_thread_->IsCurrent());
126   CritScope lock(&cs_);
127
128   if (channel_)
129     return false;
130
131   ASSERT(session_ != NULL);
132   worker_thread_ = session_->session_manager()->worker_thread();
133   content_name_ = content_name;
134   channel_ = session_->CreateChannel(
135       content_name, channel_name, component);
136   channel_name_ = channel_name;
137   channel_->SetOption(Socket::OPT_DONTFRAGMENT, 1);
138
139   channel_->SignalDestroyed.connect(this,
140     &PseudoTcpChannel::OnChannelDestroyed);
141   channel_->SignalWritableState.connect(this,
142     &PseudoTcpChannel::OnChannelWritableState);
143   channel_->SignalReadPacket.connect(this,
144     &PseudoTcpChannel::OnChannelRead);
145   channel_->SignalRouteChange.connect(this,
146     &PseudoTcpChannel::OnChannelConnectionChanged);
147
148   ASSERT(tcp_ == NULL);
149   tcp_ = new PseudoTcp(this, 0);
150   if (session_->initiator()) {
151     // Since we may try several protocols and network adapters that won't work,
152     // waiting until we get our first writable notification before initiating
153     // TCP negotiation.
154     ready_to_connect_ = true;
155   }
156
157   return true;
158 }
159
160 StreamInterface* PseudoTcpChannel::GetStream() {
161   ASSERT(signal_thread_->IsCurrent());
162   CritScope lock(&cs_);
163   ASSERT(NULL != session_);
164   if (!stream_)
165     stream_ = new PseudoTcpChannel::InternalStream(this);
166   //TODO("should we disallow creation of new stream at some point?");
167   return stream_;
168 }
169
170 void PseudoTcpChannel::OnChannelDestroyed(TransportChannel* channel) {
171   LOG_F(LS_INFO) << "(" << channel->component() << ")";
172   ASSERT(signal_thread_->IsCurrent());
173   CritScope lock(&cs_);
174   ASSERT(channel == channel_);
175   signal_thread_->Clear(this, MSG_SI_DESTROYCHANNEL);
176   // When MSG_WK_PURGE is received, we know there will be no more messages from
177   // the worker thread.
178   worker_thread_->Clear(this, MSG_WK_CLOCK);
179   worker_thread_->Post(this, MSG_WK_PURGE);
180   session_ = NULL;
181   channel_ = NULL;
182   if ((stream_ != NULL)
183       && ((tcp_ == NULL) || (tcp_->State() != PseudoTcp::TCP_CLOSED)))
184     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, 0));
185   if (tcp_) {
186     tcp_->Close(true);
187     AdjustClock();
188   }
189   SignalChannelClosed(this);
190 }
191
192 void PseudoTcpChannel::OnSessionTerminate(Session* session) {
193   // When the session terminates before we even connected
194   CritScope lock(&cs_);
195   if (session_ != NULL && channel_ == NULL) {
196     ASSERT(session == session_);
197     ASSERT(worker_thread_ == NULL);
198     ASSERT(tcp_ == NULL);
199     LOG(LS_INFO) << "Destroying unconnected PseudoTcpChannel";
200     session_ = NULL;
201     if (stream_ != NULL)
202       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, -1));
203   }
204
205   // Even though session_ is being destroyed, we mustn't clear the pointer,
206   // since we'll need it to tear down channel_.
207   //
208   // TODO: Is it always the case that if channel_ != NULL then we'll get
209   // a channel-destroyed notification?
210 }
211
212 void PseudoTcpChannel::GetOption(PseudoTcp::Option opt, int* value) {
213   ASSERT(signal_thread_->IsCurrent());
214   CritScope lock(&cs_);
215   ASSERT(tcp_ != NULL);
216   tcp_->GetOption(opt, value);
217 }
218
219 void PseudoTcpChannel::SetOption(PseudoTcp::Option opt, int value) {
220   ASSERT(signal_thread_->IsCurrent());
221   CritScope lock(&cs_);
222   ASSERT(tcp_ != NULL);
223   tcp_->SetOption(opt, value);
224 }
225
226 //
227 // Stream thread methods
228 //
229
230 StreamState PseudoTcpChannel::GetState() const {
231   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
232   CritScope lock(&cs_);
233   if (!session_)
234     return SS_CLOSED;
235   if (!tcp_)
236     return SS_OPENING;
237   switch (tcp_->State()) {
238     case PseudoTcp::TCP_LISTEN:
239     case PseudoTcp::TCP_SYN_SENT:
240     case PseudoTcp::TCP_SYN_RECEIVED:
241       return SS_OPENING;
242     case PseudoTcp::TCP_ESTABLISHED:
243       return SS_OPEN;
244     case PseudoTcp::TCP_CLOSED:
245     default:
246       return SS_CLOSED;
247   }
248 }
249
250 StreamResult PseudoTcpChannel::Read(void* buffer, size_t buffer_len,
251                                     size_t* read, int* error) {
252   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
253   CritScope lock(&cs_);
254   if (!tcp_)
255     return SR_BLOCK;
256
257   stream_readable_ = false;
258   int result = tcp_->Recv(static_cast<char*>(buffer), buffer_len);
259   //LOG_F(LS_VERBOSE) << "Recv returned: " << result;
260   if (result > 0) {
261     if (read)
262       *read = result;
263     // PseudoTcp doesn't currently support repeated Readable signals.  Simulate
264     // them here.
265     stream_readable_ = true;
266     if (!pending_read_event_) {
267       pending_read_event_ = true;
268       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ), true);
269     }
270     return SR_SUCCESS;
271   } else if (IsBlockingError(tcp_->GetError())) {
272     return SR_BLOCK;
273   } else {
274     if (error)
275       *error = tcp_->GetError();
276     return SR_ERROR;
277   }
278   // This spot is never reached.
279 }
280
281 StreamResult PseudoTcpChannel::Write(const void* data, size_t data_len,
282                                      size_t* written, int* error) {
283   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
284   CritScope lock(&cs_);
285   if (!tcp_)
286     return SR_BLOCK;
287   int result = tcp_->Send(static_cast<const char*>(data), data_len);
288   //LOG_F(LS_VERBOSE) << "Send returned: " << result;
289   if (result > 0) {
290     if (written)
291       *written = result;
292     return SR_SUCCESS;
293   } else if (IsBlockingError(tcp_->GetError())) {
294     return SR_BLOCK;
295   } else {
296     if (error)
297       *error = tcp_->GetError();
298     return SR_ERROR;
299   }
300   // This spot is never reached.
301 }
302
303 void PseudoTcpChannel::Close() {
304   ASSERT(stream_ != NULL && stream_thread_->IsCurrent());
305   CritScope lock(&cs_);
306   stream_ = NULL;
307   // Clear out any pending event notifications
308   stream_thread_->Clear(this, MSG_ST_EVENT);
309   if (tcp_) {
310     tcp_->Close(false);
311     AdjustClock();
312   } else {
313     CheckDestroy();
314   }
315 }
316
317 //
318 // Worker thread methods
319 //
320
321 void PseudoTcpChannel::OnChannelWritableState(TransportChannel* channel) {
322   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
323   ASSERT(worker_thread_->IsCurrent());
324   CritScope lock(&cs_);
325   if (!channel_) {
326     LOG_F(LS_WARNING) << "NULL channel";
327     return;
328   }
329   ASSERT(channel == channel_);
330   if (!tcp_) {
331     LOG_F(LS_WARNING) << "NULL tcp";
332     return;
333   }
334   if (!ready_to_connect_ || !channel->writable())
335     return;
336
337   ready_to_connect_ = false;
338   tcp_->Connect();
339   AdjustClock();
340 }
341
342 void PseudoTcpChannel::OnChannelRead(TransportChannel* channel,
343                                      const char* data, size_t size,
344                                      const rtc::PacketTime& packet_time,
345                                      int flags) {
346   //LOG_F(LS_VERBOSE) << "(" << size << ")";
347   ASSERT(worker_thread_->IsCurrent());
348   CritScope lock(&cs_);
349   if (!channel_) {
350     LOG_F(LS_WARNING) << "NULL channel";
351     return;
352   }
353   ASSERT(channel == channel_);
354   if (!tcp_) {
355     LOG_F(LS_WARNING) << "NULL tcp";
356     return;
357   }
358   tcp_->NotifyPacket(data, size);
359   AdjustClock();
360 }
361
362 void PseudoTcpChannel::OnChannelConnectionChanged(TransportChannel* channel,
363                                                   const Candidate& candidate) {
364   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
365   ASSERT(worker_thread_->IsCurrent());
366   CritScope lock(&cs_);
367   if (!channel_) {
368     LOG_F(LS_WARNING) << "NULL channel";
369     return;
370   }
371   ASSERT(channel == channel_);
372   if (!tcp_) {
373     LOG_F(LS_WARNING) << "NULL tcp";
374     return;
375   }
376
377   uint16 mtu = 1280;  // safe default
378   int family = candidate.address().family();
379   Socket* socket =
380       worker_thread_->socketserver()->CreateAsyncSocket(family, SOCK_DGRAM);
381   rtc::scoped_ptr<Socket> mtu_socket(socket);
382   if (socket == NULL) {
383     LOG_F(LS_WARNING) << "Couldn't create socket while estimating MTU.";
384   } else {
385     if (mtu_socket->Connect(candidate.address()) < 0 ||
386         mtu_socket->EstimateMTU(&mtu) < 0) {
387       LOG_F(LS_WARNING) << "Failed to estimate MTU, error="
388                         << mtu_socket->GetError();
389     }
390   }
391
392   LOG_F(LS_VERBOSE) << "Using MTU of " << mtu << " bytes";
393   tcp_->NotifyMTU(mtu);
394   AdjustClock();
395 }
396
397 void PseudoTcpChannel::OnTcpOpen(PseudoTcp* tcp) {
398   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
399   ASSERT(cs_.CurrentThreadIsOwner());
400   ASSERT(worker_thread_->IsCurrent());
401   ASSERT(tcp == tcp_);
402   if (stream_) {
403     stream_readable_ = true;
404     pending_read_event_ = true;
405     stream_thread_->Post(this, MSG_ST_EVENT,
406                          new EventData(SE_OPEN | SE_READ | SE_WRITE));
407   }
408 }
409
410 void PseudoTcpChannel::OnTcpReadable(PseudoTcp* tcp) {
411   //LOG_F(LS_VERBOSE);
412   ASSERT(cs_.CurrentThreadIsOwner());
413   ASSERT(worker_thread_->IsCurrent());
414   ASSERT(tcp == tcp_);
415   if (stream_) {
416     stream_readable_ = true;
417     if (!pending_read_event_) {
418       pending_read_event_ = true;
419       stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_READ));
420     }
421   }
422 }
423
424 void PseudoTcpChannel::OnTcpWriteable(PseudoTcp* tcp) {
425   //LOG_F(LS_VERBOSE);
426   ASSERT(cs_.CurrentThreadIsOwner());
427   ASSERT(worker_thread_->IsCurrent());
428   ASSERT(tcp == tcp_);
429   if (stream_)
430     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_WRITE));
431 }
432
433 void PseudoTcpChannel::OnTcpClosed(PseudoTcp* tcp, uint32 nError) {
434   LOG_F(LS_VERBOSE) << "[" << channel_name_ << "]";
435   ASSERT(cs_.CurrentThreadIsOwner());
436   ASSERT(worker_thread_->IsCurrent());
437   ASSERT(tcp == tcp_);
438   if (stream_)
439     stream_thread_->Post(this, MSG_ST_EVENT, new EventData(SE_CLOSE, nError));
440 }
441
442 //
443 // Multi-thread methods
444 //
445
446 void PseudoTcpChannel::OnMessage(Message* pmsg) {
447   if (pmsg->message_id == MSG_WK_CLOCK) {
448
449     ASSERT(worker_thread_->IsCurrent());
450     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_WK_CLOCK)";
451     CritScope lock(&cs_);
452     if (tcp_) {
453       tcp_->NotifyClock(PseudoTcp::Now());
454       AdjustClock(false);
455     }
456
457   } else if (pmsg->message_id == MSG_WK_PURGE) {
458
459     ASSERT(worker_thread_->IsCurrent());
460     LOG_F(LS_INFO) << "(MSG_WK_PURGE)";
461     // At this point, we know there are no additional worker thread messages.
462     CritScope lock(&cs_);
463     ASSERT(NULL == session_);
464     ASSERT(NULL == channel_);
465     worker_thread_ = NULL;
466     CheckDestroy();
467
468   } else if (pmsg->message_id == MSG_ST_EVENT) {
469
470     ASSERT(stream_thread_->IsCurrent());
471     //LOG(LS_INFO) << "PseudoTcpChannel::OnMessage(MSG_ST_EVENT, "
472     //             << data->event << ", " << data->error << ")";
473     ASSERT(stream_ != NULL);
474     EventData* data = static_cast<EventData*>(pmsg->pdata);
475     if (data->event & SE_READ) {
476       CritScope lock(&cs_);
477       pending_read_event_ = false;
478     }
479     stream_->SignalEvent(stream_, data->event, data->error);
480     delete data;
481
482   } else if (pmsg->message_id == MSG_SI_DESTROYCHANNEL) {
483
484     ASSERT(signal_thread_->IsCurrent());
485     LOG_F(LS_INFO) << "(MSG_SI_DESTROYCHANNEL)";
486     ASSERT(session_ != NULL);
487     ASSERT(channel_ != NULL);
488     session_->DestroyChannel(content_name_, channel_->component());
489
490   } else if (pmsg->message_id == MSG_SI_DESTROY) {
491
492     ASSERT(signal_thread_->IsCurrent());
493     LOG_F(LS_INFO) << "(MSG_SI_DESTROY)";
494     // The message queue is empty, so it is safe to destroy ourselves.
495     delete this;
496
497   } else {
498     ASSERT(false);
499   }
500 }
501
502 IPseudoTcpNotify::WriteResult PseudoTcpChannel::TcpWritePacket(
503     PseudoTcp* tcp, const char* buffer, size_t len) {
504   ASSERT(cs_.CurrentThreadIsOwner());
505   ASSERT(tcp == tcp_);
506   ASSERT(NULL != channel_);
507   rtc::PacketOptions packet_options;
508   int sent = channel_->SendPacket(buffer, len, packet_options);
509   if (sent > 0) {
510     //LOG_F(LS_VERBOSE) << "(" << sent << ") Sent";
511     return IPseudoTcpNotify::WR_SUCCESS;
512   } else if (IsBlockingError(channel_->GetError())) {
513     LOG_F(LS_VERBOSE) << "Blocking";
514     return IPseudoTcpNotify::WR_SUCCESS;
515   } else if (channel_->GetError() == EMSGSIZE) {
516     LOG_F(LS_ERROR) << "EMSGSIZE";
517     return IPseudoTcpNotify::WR_TOO_LARGE;
518   } else {
519     PLOG(LS_ERROR, channel_->GetError()) << "PseudoTcpChannel::TcpWritePacket";
520     ASSERT(false);
521     return IPseudoTcpNotify::WR_FAIL;
522   }
523 }
524
525 void PseudoTcpChannel::AdjustClock(bool clear) {
526   ASSERT(cs_.CurrentThreadIsOwner());
527   ASSERT(NULL != tcp_);
528
529   long timeout = 0;
530   if (tcp_->GetNextClock(PseudoTcp::Now(), timeout)) {
531     ASSERT(NULL != channel_);
532     // Reset the next clock, by clearing the old and setting a new one.
533     if (clear)
534       worker_thread_->Clear(this, MSG_WK_CLOCK);
535     worker_thread_->PostDelayed(_max(timeout, 0L), this, MSG_WK_CLOCK);
536     return;
537   }
538
539   delete tcp_;
540   tcp_ = NULL;
541   ready_to_connect_ = false;
542
543   if (channel_) {
544     // If TCP has failed, no need for channel_ anymore
545     signal_thread_->Post(this, MSG_SI_DESTROYCHANNEL);
546   }
547 }
548
549 void PseudoTcpChannel::CheckDestroy() {
550   ASSERT(cs_.CurrentThreadIsOwner());
551   if ((worker_thread_ != NULL) || (stream_ != NULL))
552     return;
553   signal_thread_->Post(this, MSG_SI_DESTROY);
554 }
555
556 ///////////////////////////////////////////////////////////////////////////////
557 // PseudoTcpChannel::InternalStream
558 ///////////////////////////////////////////////////////////////////////////////
559
560 PseudoTcpChannel::InternalStream::InternalStream(PseudoTcpChannel* parent)
561   : parent_(parent) {
562 }
563
564 PseudoTcpChannel::InternalStream::~InternalStream() {
565   Close();
566 }
567
568 StreamState PseudoTcpChannel::InternalStream::GetState() const {
569   if (!parent_)
570     return SS_CLOSED;
571   return parent_->GetState();
572 }
573
574 StreamResult PseudoTcpChannel::InternalStream::Read(
575     void* buffer, size_t buffer_len, size_t* read, int* error) {
576   if (!parent_) {
577     if (error)
578       *error = ENOTCONN;
579     return SR_ERROR;
580   }
581   return parent_->Read(buffer, buffer_len, read, error);
582 }
583
584 StreamResult PseudoTcpChannel::InternalStream::Write(
585     const void* data, size_t data_len,  size_t* written, int* error) {
586   if (!parent_) {
587     if (error)
588       *error = ENOTCONN;
589     return SR_ERROR;
590   }
591   return parent_->Write(data, data_len, written, error);
592 }
593
594 void PseudoTcpChannel::InternalStream::Close() {
595   if (!parent_)
596     return;
597   parent_->Close();
598   parent_ = NULL;
599 }
600
601 ///////////////////////////////////////////////////////////////////////////////
602
603 } // namespace cricket