Upstream version 5.34.92.0
[platform/framework/web/crosswalk.git] / src / third_party / libjingle / source / talk / base / stream.cc
1 /*
2  * libjingle
3  * Copyright 2004 Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #if defined(POSIX)
29 #include <sys/file.h>
30 #endif  // POSIX
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <errno.h>
34 #include <string>
35 #include "talk/base/basictypes.h"
36 #include "talk/base/common.h"
37 #include "talk/base/logging.h"
38 #include "talk/base/messagequeue.h"
39 #include "talk/base/stream.h"
40 #include "talk/base/stringencode.h"
41 #include "talk/base/stringutils.h"
42 #include "talk/base/thread.h"
43 #include "talk/base/timeutils.h"
44
45 #ifdef WIN32
46 #include "talk/base/win32.h"
47 #define fileno _fileno
48 #endif
49
50 namespace talk_base {
51
52 ///////////////////////////////////////////////////////////////////////////////
53 // StreamInterface
54 ///////////////////////////////////////////////////////////////////////////////
55 StreamInterface::~StreamInterface() {
56 }
57
58 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
59                                        size_t* written, int* error) {
60   StreamResult result = SR_SUCCESS;
61   size_t total_written = 0, current_written;
62   while (total_written < data_len) {
63     result = Write(static_cast<const char*>(data) + total_written,
64                    data_len - total_written, &current_written, error);
65     if (result != SR_SUCCESS)
66       break;
67     total_written += current_written;
68   }
69   if (written)
70     *written = total_written;
71   return result;
72 }
73
74 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
75                                       size_t* read, int* error) {
76   StreamResult result = SR_SUCCESS;
77   size_t total_read = 0, current_read;
78   while (total_read < buffer_len) {
79     result = Read(static_cast<char*>(buffer) + total_read,
80                   buffer_len - total_read, &current_read, error);
81     if (result != SR_SUCCESS)
82       break;
83     total_read += current_read;
84   }
85   if (read)
86     *read = total_read;
87   return result;
88 }
89
90 StreamResult StreamInterface::ReadLine(std::string* line) {
91   line->clear();
92   StreamResult result = SR_SUCCESS;
93   while (true) {
94     char ch;
95     result = Read(&ch, sizeof(ch), NULL, NULL);
96     if (result != SR_SUCCESS) {
97       break;
98     }
99     if (ch == '\n') {
100       break;
101     }
102     line->push_back(ch);
103   }
104   if (!line->empty()) {   // give back the line we've collected so far with
105     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
106   }
107   return result;
108 }
109
110 void StreamInterface::PostEvent(Thread* t, int events, int err) {
111   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
112 }
113
114 void StreamInterface::PostEvent(int events, int err) {
115   PostEvent(Thread::Current(), events, err);
116 }
117
118 StreamInterface::StreamInterface() {
119 }
120
121 void StreamInterface::OnMessage(Message* msg) {
122   if (MSG_POST_EVENT == msg->message_id) {
123     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
124     SignalEvent(this, pe->events, pe->error);
125     delete msg->pdata;
126   }
127 }
128
129 ///////////////////////////////////////////////////////////////////////////////
130 // StreamAdapterInterface
131 ///////////////////////////////////////////////////////////////////////////////
132
133 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
134                                                bool owned)
135     : stream_(stream), owned_(owned) {
136   if (NULL != stream_)
137     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
138 }
139
140 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
141   if (NULL != stream_)
142     stream_->SignalEvent.disconnect(this);
143   if (owned_)
144     delete stream_;
145   stream_ = stream;
146   owned_ = owned;
147   if (NULL != stream_)
148     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
149 }
150
151 StreamInterface* StreamAdapterInterface::Detach() {
152   if (NULL != stream_)
153     stream_->SignalEvent.disconnect(this);
154   StreamInterface* stream = stream_;
155   stream_ = NULL;
156   return stream;
157 }
158
159 StreamAdapterInterface::~StreamAdapterInterface() {
160   if (owned_)
161     delete stream_;
162 }
163
164 ///////////////////////////////////////////////////////////////////////////////
165 // StreamTap
166 ///////////////////////////////////////////////////////////////////////////////
167
168 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
169     : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
170         tap_error_(0) {
171   AttachTap(tap);
172 }
173
174 void StreamTap::AttachTap(StreamInterface* tap) {
175   tap_.reset(tap);
176 }
177
178 StreamInterface* StreamTap::DetachTap() {
179   return tap_.release();
180 }
181
182 StreamResult StreamTap::GetTapResult(int* error) {
183   if (error) {
184     *error = tap_error_;
185   }
186   return tap_result_;
187 }
188
189 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
190                              size_t* read, int* error) {
191   size_t backup_read;
192   if (!read) {
193     read = &backup_read;
194   }
195   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
196                                                   read, error);
197   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
198     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
199   }
200   return res;
201 }
202
203 StreamResult StreamTap::Write(const void* data, size_t data_len,
204                               size_t* written, int* error) {
205   size_t backup_written;
206   if (!written) {
207     written = &backup_written;
208   }
209   StreamResult res = StreamAdapterInterface::Write(data, data_len,
210                                                    written, error);
211   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
212     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
213   }
214   return res;
215 }
216
217 ///////////////////////////////////////////////////////////////////////////////
218 // StreamSegment
219 ///////////////////////////////////////////////////////////////////////////////
220
221 StreamSegment::StreamSegment(StreamInterface* stream)
222     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
223     length_(SIZE_UNKNOWN) {
224   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
225   stream->GetPosition(&start_);
226 }
227
228 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
229     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
230     length_(length) {
231   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
232   stream->GetPosition(&start_);
233 }
234
235 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
236                                  size_t* read, int* error) {
237   if (SIZE_UNKNOWN != length_) {
238     if (pos_ >= length_)
239       return SR_EOS;
240     buffer_len = _min(buffer_len, length_ - pos_);
241   }
242   size_t backup_read;
243   if (!read) {
244     read = &backup_read;
245   }
246   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
247                                                      read, error);
248   if (SR_SUCCESS == result) {
249     pos_ += *read;
250   }
251   return result;
252 }
253
254 bool StreamSegment::SetPosition(size_t position) {
255   if (SIZE_UNKNOWN == start_)
256     return false;  // Not seekable
257   if ((SIZE_UNKNOWN != length_) && (position > length_))
258     return false;  // Seek past end of segment
259   if (!StreamAdapterInterface::SetPosition(start_ + position))
260     return false;
261   pos_ = position;
262   return true;
263 }
264
265 bool StreamSegment::GetPosition(size_t* position) const {
266   if (SIZE_UNKNOWN == start_)
267     return false;  // Not seekable
268   if (!StreamAdapterInterface::GetPosition(position))
269     return false;
270   if (position) {
271     ASSERT(*position >= start_);
272     *position -= start_;
273   }
274   return true;
275 }
276
277 bool StreamSegment::GetSize(size_t* size) const {
278   if (!StreamAdapterInterface::GetSize(size))
279     return false;
280   if (size) {
281     if (SIZE_UNKNOWN != start_) {
282       ASSERT(*size >= start_);
283       *size -= start_;
284     }
285     if (SIZE_UNKNOWN != length_) {
286       *size = _min(*size, length_);
287     }
288   }
289   return true;
290 }
291
292 bool StreamSegment::GetAvailable(size_t* size) const {
293   if (!StreamAdapterInterface::GetAvailable(size))
294     return false;
295   if (size && (SIZE_UNKNOWN != length_))
296     *size = _min(*size, length_ - pos_);
297   return true;
298 }
299
300 ///////////////////////////////////////////////////////////////////////////////
301 // NullStream
302 ///////////////////////////////////////////////////////////////////////////////
303
304 NullStream::NullStream() {
305 }
306
307 NullStream::~NullStream() {
308 }
309
310 StreamState NullStream::GetState() const {
311   return SS_OPEN;
312 }
313
314 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
315                               size_t* read, int* error) {
316   if (error) *error = -1;
317   return SR_ERROR;
318 }
319
320 StreamResult NullStream::Write(const void* data, size_t data_len,
321                                size_t* written, int* error) {
322   if (written) *written = data_len;
323   return SR_SUCCESS;
324 }
325
326 void NullStream::Close() {
327 }
328
329 ///////////////////////////////////////////////////////////////////////////////
330 // FileStream
331 ///////////////////////////////////////////////////////////////////////////////
332
333 FileStream::FileStream() : file_(NULL) {
334 }
335
336 FileStream::~FileStream() {
337   FileStream::Close();
338 }
339
340 bool FileStream::Open(const std::string& filename, const char* mode,
341                       int* error) {
342   Close();
343 #ifdef WIN32
344   std::wstring wfilename;
345   if (Utf8ToWindowsFilename(filename, &wfilename)) {
346     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
347   } else {
348     if (error) {
349       *error = -1;
350       return false;
351     }
352   }
353 #else
354   file_ = fopen(filename.c_str(), mode);
355 #endif
356   if (!file_ && error) {
357     *error = errno;
358   }
359   return (file_ != NULL);
360 }
361
362 bool FileStream::OpenShare(const std::string& filename, const char* mode,
363                            int shflag, int* error) {
364   Close();
365 #ifdef WIN32
366   std::wstring wfilename;
367   if (Utf8ToWindowsFilename(filename, &wfilename)) {
368     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
369     if (!file_ && error) {
370       *error = errno;
371       return false;
372     }
373     return file_ != NULL;
374   } else {
375     if (error) {
376       *error = -1;
377     }
378     return false;
379   }
380 #else
381   return Open(filename, mode, error);
382 #endif
383 }
384
385 bool FileStream::DisableBuffering() {
386   if (!file_)
387     return false;
388   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
389 }
390
391 StreamState FileStream::GetState() const {
392   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
393 }
394
395 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
396                               size_t* read, int* error) {
397   if (!file_)
398     return SR_EOS;
399   size_t result = fread(buffer, 1, buffer_len, file_);
400   if ((result == 0) && (buffer_len > 0)) {
401     if (feof(file_))
402       return SR_EOS;
403     if (error)
404       *error = errno;
405     return SR_ERROR;
406   }
407   if (read)
408     *read = result;
409   return SR_SUCCESS;
410 }
411
412 StreamResult FileStream::Write(const void* data, size_t data_len,
413                                size_t* written, int* error) {
414   if (!file_)
415     return SR_EOS;
416   size_t result = fwrite(data, 1, data_len, file_);
417   if ((result == 0) && (data_len > 0)) {
418     if (error)
419       *error = errno;
420     return SR_ERROR;
421   }
422   if (written)
423     *written = result;
424   return SR_SUCCESS;
425 }
426
427 void FileStream::Close() {
428   if (file_) {
429     DoClose();
430     file_ = NULL;
431   }
432 }
433
434 bool FileStream::SetPosition(size_t position) {
435   if (!file_)
436     return false;
437   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
438 }
439
440 bool FileStream::GetPosition(size_t* position) const {
441   ASSERT(NULL != position);
442   if (!file_)
443     return false;
444   long result = ftell(file_);
445   if (result < 0)
446     return false;
447   if (position)
448     *position = result;
449   return true;
450 }
451
452 bool FileStream::GetSize(size_t* size) const {
453   ASSERT(NULL != size);
454   if (!file_)
455     return false;
456   struct stat file_stats;
457   if (fstat(fileno(file_), &file_stats) != 0)
458     return false;
459   if (size)
460     *size = file_stats.st_size;
461   return true;
462 }
463
464 bool FileStream::GetAvailable(size_t* size) const {
465   ASSERT(NULL != size);
466   if (!GetSize(size))
467     return false;
468   long result = ftell(file_);
469   if (result < 0)
470     return false;
471   if (size)
472     *size -= result;
473   return true;
474 }
475
476 bool FileStream::ReserveSize(size_t size) {
477   // TODO: extend the file to the proper length
478   return true;
479 }
480
481 bool FileStream::GetSize(const std::string& filename, size_t* size) {
482   struct stat file_stats;
483   if (stat(filename.c_str(), &file_stats) != 0)
484     return false;
485   *size = file_stats.st_size;
486   return true;
487 }
488
489 bool FileStream::Flush() {
490   if (file_) {
491     return (0 == fflush(file_));
492   }
493   // try to flush empty file?
494   ASSERT(false);
495   return false;
496 }
497
498 #if defined(POSIX) && !defined(__native_client__)
499
500 bool FileStream::TryLock() {
501   if (file_ == NULL) {
502     // Stream not open.
503     ASSERT(false);
504     return false;
505   }
506
507   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
508 }
509
510 bool FileStream::Unlock() {
511   if (file_ == NULL) {
512     // Stream not open.
513     ASSERT(false);
514     return false;
515   }
516
517   return flock(fileno(file_), LOCK_UN) == 0;
518 }
519
520 #endif
521
522 void FileStream::DoClose() {
523   fclose(file_);
524 }
525
526 CircularFileStream::CircularFileStream(size_t max_size)
527   : max_write_size_(max_size),
528     position_(0),
529     marked_position_(max_size / 2),
530     last_write_position_(0),
531     read_segment_(READ_LATEST),
532     read_segment_available_(0) {
533 }
534
535 bool CircularFileStream::Open(
536     const std::string& filename, const char* mode, int* error) {
537   if (!FileStream::Open(filename.c_str(), mode, error))
538     return false;
539
540   if (strchr(mode, "r") != NULL) {  // Opened in read mode.
541     // Check if the buffer has been overwritten and determine how to read the
542     // log in time sequence.
543     size_t file_size;
544     GetSize(&file_size);
545     if (file_size == position_) {
546       // The buffer has not been overwritten yet. Read 0 .. file_size
547       read_segment_ = READ_LATEST;
548       read_segment_available_ = file_size;
549     } else {
550       // The buffer has been over written. There are three segments: The first
551       // one is 0 .. marked_position_, which is the marked earliest log. The
552       // second one is position_ .. file_size, which is the middle log. The
553       // last one is marked_position_ .. position_, which is the latest log.
554       read_segment_ = READ_MARKED;
555       read_segment_available_ = marked_position_;
556       last_write_position_ = position_;
557     }
558
559     // Read from the beginning.
560     position_ = 0;
561     SetPosition(position_);
562   }
563
564   return true;
565 }
566
567 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
568                                       size_t* read, int* error) {
569   if (read_segment_available_ == 0) {
570     size_t file_size;
571     switch (read_segment_) {
572       case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
573         read_segment_ = READ_MIDDLE;
574         position_ = last_write_position_;
575         SetPosition(position_);
576         GetSize(&file_size);
577         read_segment_available_ = file_size - position_;
578         break;
579
580       case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
581         read_segment_ = READ_LATEST;
582         position_ = marked_position_;
583         SetPosition(position_);
584         read_segment_available_ = last_write_position_ - position_;
585         break;
586
587       default:  // Finished READ_LATEST and return EOS.
588         return talk_base::SR_EOS;
589     }
590   }
591
592   size_t local_read;
593   if (!read) read = &local_read;
594
595   size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
596   talk_base::StreamResult result
597     = talk_base::FileStream::Read(buffer, to_read, read, error);
598   if (result == talk_base::SR_SUCCESS) {
599     read_segment_available_ -= *read;
600     position_ += *read;
601   }
602   return result;
603 }
604
605 StreamResult CircularFileStream::Write(const void* data, size_t data_len,
606                                        size_t* written, int* error) {
607   if (position_ >= max_write_size_) {
608     ASSERT(position_ == max_write_size_);
609     position_ = marked_position_;
610     SetPosition(position_);
611   }
612
613   size_t local_written;
614   if (!written) written = &local_written;
615
616   size_t to_eof = max_write_size_ - position_;
617   size_t to_write = talk_base::_min(data_len, to_eof);
618   talk_base::StreamResult result
619     = talk_base::FileStream::Write(data, to_write, written, error);
620   if (result == talk_base::SR_SUCCESS) {
621     position_ += *written;
622   }
623   return result;
624 }
625
626 AsyncWriteStream::~AsyncWriteStream() {
627   write_thread_->Clear(this, 0, NULL);
628   ClearBufferAndWrite();
629
630   CritScope cs(&crit_stream_);
631   stream_.reset();
632 }
633
634 // This is needed by some stream writers, such as RtpDumpWriter.
635 bool AsyncWriteStream::GetPosition(size_t* position) const {
636   CritScope cs(&crit_stream_);
637   return stream_->GetPosition(position);
638 }
639
640 // This is needed by some stream writers, such as the plugin log writers.
641 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
642                                     size_t* read, int* error) {
643   CritScope cs(&crit_stream_);
644   return stream_->Read(buffer, buffer_len, read, error);
645 }
646
647 void AsyncWriteStream::Close() {
648   if (state_ == SS_CLOSED) {
649     return;
650   }
651
652   write_thread_->Clear(this, 0, NULL);
653   ClearBufferAndWrite();
654
655   CritScope cs(&crit_stream_);
656   stream_->Close();
657   state_ = SS_CLOSED;
658 }
659
660 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
661                                      size_t* written, int* error) {
662   if (state_ == SS_CLOSED) {
663     return SR_ERROR;
664   }
665
666   size_t previous_buffer_length = 0;
667   {
668     CritScope cs(&crit_buffer_);
669     previous_buffer_length = buffer_.length();
670     buffer_.AppendData(data, data_len);
671   }
672
673   if (previous_buffer_length == 0) {
674     // If there's stuff already in the buffer, then we already called
675     // Post and the write_thread_ hasn't pulled it out yet, so we
676     // don't need to re-Post.
677     write_thread_->Post(this, 0, NULL);
678   }
679   // Return immediately, assuming that it works.
680   if (written) {
681     *written = data_len;
682   }
683   return SR_SUCCESS;
684 }
685
686 void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
687   ClearBufferAndWrite();
688 }
689
690 bool AsyncWriteStream::Flush() {
691   if (state_ == SS_CLOSED) {
692     return false;
693   }
694
695   ClearBufferAndWrite();
696
697   CritScope cs(&crit_stream_);
698   return stream_->Flush();
699 }
700
701 void AsyncWriteStream::ClearBufferAndWrite() {
702   Buffer to_write;
703   {
704     CritScope cs_buffer(&crit_buffer_);
705     buffer_.TransferTo(&to_write);
706   }
707
708   if (to_write.length() > 0) {
709     CritScope cs(&crit_stream_);
710     stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
711   }
712 }
713
714 #if defined(POSIX) && !defined(__native_client__)
715
716 // Have to identically rewrite the FileStream destructor or else it would call
717 // the base class's Close() instead of the sub-class's.
718 POpenStream::~POpenStream() {
719   POpenStream::Close();
720 }
721
722 bool POpenStream::Open(const std::string& subcommand,
723                        const char* mode,
724                        int* error) {
725   Close();
726   file_ = popen(subcommand.c_str(), mode);
727   if (file_ == NULL) {
728     if (error)
729       *error = errno;
730     return false;
731   }
732   return true;
733 }
734
735 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
736                             int shflag, int* error) {
737   return Open(subcommand, mode, error);
738 }
739
740 void POpenStream::DoClose() {
741   wait_status_ = pclose(file_);
742 }
743
744 #endif
745
746 ///////////////////////////////////////////////////////////////////////////////
747 // MemoryStream
748 ///////////////////////////////////////////////////////////////////////////////
749
750 MemoryStreamBase::MemoryStreamBase()
751   : buffer_(NULL), buffer_length_(0), data_length_(0),
752     seek_position_(0) {
753 }
754
755 StreamState MemoryStreamBase::GetState() const {
756   return SS_OPEN;
757 }
758
759 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
760                                     size_t* bytes_read, int* error) {
761   if (seek_position_ >= data_length_) {
762     return SR_EOS;
763   }
764   size_t available = data_length_ - seek_position_;
765   if (bytes > available) {
766     // Read partial buffer
767     bytes = available;
768   }
769   memcpy(buffer, &buffer_[seek_position_], bytes);
770   seek_position_ += bytes;
771   if (bytes_read) {
772     *bytes_read = bytes;
773   }
774   return SR_SUCCESS;
775 }
776
777 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
778                                      size_t* bytes_written, int* error) {
779   size_t available = buffer_length_ - seek_position_;
780   if (0 == available) {
781     // Increase buffer size to the larger of:
782     // a) new position rounded up to next 256 bytes
783     // b) double the previous length
784     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
785                                     buffer_length_ * 2);
786     StreamResult result = DoReserve(new_buffer_length, error);
787     if (SR_SUCCESS != result) {
788       return result;
789     }
790     ASSERT(buffer_length_ >= new_buffer_length);
791     available = buffer_length_ - seek_position_;
792   }
793
794   if (bytes > available) {
795     bytes = available;
796   }
797   memcpy(&buffer_[seek_position_], buffer, bytes);
798   seek_position_ += bytes;
799   if (data_length_ < seek_position_) {
800     data_length_ = seek_position_;
801   }
802   if (bytes_written) {
803     *bytes_written = bytes;
804   }
805   return SR_SUCCESS;
806 }
807
808 void MemoryStreamBase::Close() {
809   // nothing to do
810 }
811
812 bool MemoryStreamBase::SetPosition(size_t position) {
813   if (position > data_length_)
814     return false;
815   seek_position_ = position;
816   return true;
817 }
818
819 bool MemoryStreamBase::GetPosition(size_t* position) const {
820   if (position)
821     *position = seek_position_;
822   return true;
823 }
824
825 bool MemoryStreamBase::GetSize(size_t* size) const {
826   if (size)
827     *size = data_length_;
828   return true;
829 }
830
831 bool MemoryStreamBase::GetAvailable(size_t* size) const {
832   if (size)
833     *size = data_length_ - seek_position_;
834   return true;
835 }
836
837 bool MemoryStreamBase::ReserveSize(size_t size) {
838   return (SR_SUCCESS == DoReserve(size, NULL));
839 }
840
841 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
842   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
843 }
844
845 ///////////////////////////////////////////////////////////////////////////////
846
847 MemoryStream::MemoryStream()
848   : buffer_alloc_(NULL) {
849 }
850
851 MemoryStream::MemoryStream(const char* data)
852   : buffer_alloc_(NULL) {
853   SetData(data, strlen(data));
854 }
855
856 MemoryStream::MemoryStream(const void* data, size_t length)
857   : buffer_alloc_(NULL) {
858   SetData(data, length);
859 }
860
861 MemoryStream::~MemoryStream() {
862   delete [] buffer_alloc_;
863 }
864
865 void MemoryStream::SetData(const void* data, size_t length) {
866   data_length_ = buffer_length_ = length;
867   delete [] buffer_alloc_;
868   buffer_alloc_ = new char[buffer_length_ + kAlignment];
869   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
870   memcpy(buffer_, data, data_length_);
871   seek_position_ = 0;
872 }
873
874 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
875   if (buffer_length_ >= size)
876     return SR_SUCCESS;
877
878   if (char* new_buffer_alloc = new char[size + kAlignment]) {
879     char* new_buffer = reinterpret_cast<char*>(
880         ALIGNP(new_buffer_alloc, kAlignment));
881     memcpy(new_buffer, buffer_, data_length_);
882     delete [] buffer_alloc_;
883     buffer_alloc_ = new_buffer_alloc;
884     buffer_ = new_buffer;
885     buffer_length_ = size;
886     return SR_SUCCESS;
887   }
888
889   if (error) {
890     *error = ENOMEM;
891   }
892   return SR_ERROR;
893 }
894
895 ///////////////////////////////////////////////////////////////////////////////
896
897 ExternalMemoryStream::ExternalMemoryStream() {
898 }
899
900 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
901   SetData(data, length);
902 }
903
904 ExternalMemoryStream::~ExternalMemoryStream() {
905 }
906
907 void ExternalMemoryStream::SetData(void* data, size_t length) {
908   data_length_ = buffer_length_ = length;
909   buffer_ = static_cast<char*>(data);
910   seek_position_ = 0;
911 }
912
913 ///////////////////////////////////////////////////////////////////////////////
914 // FifoBuffer
915 ///////////////////////////////////////////////////////////////////////////////
916
917 FifoBuffer::FifoBuffer(size_t size)
918     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
919       data_length_(0), read_position_(0), owner_(Thread::Current()) {
920   // all events are done on the owner_ thread
921 }
922
923 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
924     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
925       data_length_(0), read_position_(0), owner_(owner) {
926   // all events are done on the owner_ thread
927 }
928
929 FifoBuffer::~FifoBuffer() {
930 }
931
932 bool FifoBuffer::GetBuffered(size_t* size) const {
933   CritScope cs(&crit_);
934   *size = data_length_;
935   return true;
936 }
937
938 bool FifoBuffer::SetCapacity(size_t size) {
939   CritScope cs(&crit_);
940   if (data_length_ > size) {
941     return false;
942   }
943
944   if (size != buffer_length_) {
945     char* buffer = new char[size];
946     const size_t copy = data_length_;
947     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
948     memcpy(buffer, &buffer_[read_position_], tail_copy);
949     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
950     buffer_.reset(buffer);
951     read_position_ = 0;
952     buffer_length_ = size;
953   }
954   return true;
955 }
956
957 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
958                                     size_t offset, size_t* bytes_read) {
959   CritScope cs(&crit_);
960   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
961 }
962
963 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
964                                      size_t offset, size_t* bytes_written) {
965   CritScope cs(&crit_);
966   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
967 }
968
969 StreamState FifoBuffer::GetState() const {
970   return state_;
971 }
972
973 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
974                               size_t* bytes_read, int* error) {
975   CritScope cs(&crit_);
976   const bool was_writable = data_length_ < buffer_length_;
977   size_t copy = 0;
978   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
979
980   if (result == SR_SUCCESS) {
981     // If read was successful then adjust the read position and number of
982     // bytes buffered.
983     read_position_ = (read_position_ + copy) % buffer_length_;
984     data_length_ -= copy;
985     if (bytes_read) {
986       *bytes_read = copy;
987     }
988
989     // if we were full before, and now we're not, post an event
990     if (!was_writable && copy > 0) {
991       PostEvent(owner_, SE_WRITE, 0);
992     }
993   }
994   return result;
995 }
996
997 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
998                                size_t* bytes_written, int* error) {
999   CritScope cs(&crit_);
1000
1001   const bool was_readable = (data_length_ > 0);
1002   size_t copy = 0;
1003   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
1004
1005   if (result == SR_SUCCESS) {
1006     // If write was successful then adjust the number of readable bytes.
1007     data_length_ += copy;
1008     if (bytes_written) {
1009       *bytes_written = copy;
1010     }
1011
1012     // if we didn't have any data to read before, and now we do, post an event
1013     if (!was_readable && copy > 0) {
1014       PostEvent(owner_, SE_READ, 0);
1015     }
1016   }
1017   return result;
1018 }
1019
1020 void FifoBuffer::Close() {
1021   CritScope cs(&crit_);
1022   state_ = SS_CLOSED;
1023 }
1024
1025 const void* FifoBuffer::GetReadData(size_t* size) {
1026   CritScope cs(&crit_);
1027   *size = (read_position_ + data_length_ <= buffer_length_) ?
1028       data_length_ : buffer_length_ - read_position_;
1029   return &buffer_[read_position_];
1030 }
1031
1032 void FifoBuffer::ConsumeReadData(size_t size) {
1033   CritScope cs(&crit_);
1034   ASSERT(size <= data_length_);
1035   const bool was_writable = data_length_ < buffer_length_;
1036   read_position_ = (read_position_ + size) % buffer_length_;
1037   data_length_ -= size;
1038   if (!was_writable && size > 0) {
1039     PostEvent(owner_, SE_WRITE, 0);
1040   }
1041 }
1042
1043 void* FifoBuffer::GetWriteBuffer(size_t* size) {
1044   CritScope cs(&crit_);
1045   if (state_ == SS_CLOSED) {
1046     return NULL;
1047   }
1048
1049   // if empty, reset the write position to the beginning, so we can get
1050   // the biggest possible block
1051   if (data_length_ == 0) {
1052     read_position_ = 0;
1053   }
1054
1055   const size_t write_position = (read_position_ + data_length_)
1056       % buffer_length_;
1057   *size = (write_position > read_position_ || data_length_ == 0) ?
1058       buffer_length_ - write_position : read_position_ - write_position;
1059   return &buffer_[write_position];
1060 }
1061
1062 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1063   CritScope cs(&crit_);
1064   ASSERT(size <= buffer_length_ - data_length_);
1065   const bool was_readable = (data_length_ > 0);
1066   data_length_ += size;
1067   if (!was_readable && size > 0) {
1068     PostEvent(owner_, SE_READ, 0);
1069   }
1070 }
1071
1072 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1073   CritScope cs(&crit_);
1074   *size = buffer_length_ - data_length_;
1075   return true;
1076 }
1077
1078 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1079                                           size_t bytes,
1080                                           size_t offset,
1081                                           size_t* bytes_read) {
1082   if (offset >= data_length_) {
1083     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1084   }
1085
1086   const size_t available = data_length_ - offset;
1087   const size_t read_position = (read_position_ + offset) % buffer_length_;
1088   const size_t copy = _min(bytes, available);
1089   const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1090   char* const p = static_cast<char*>(buffer);
1091   memcpy(p, &buffer_[read_position], tail_copy);
1092   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1093
1094   if (bytes_read) {
1095     *bytes_read = copy;
1096   }
1097   return SR_SUCCESS;
1098 }
1099
1100 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1101                                            size_t bytes,
1102                                            size_t offset,
1103                                            size_t* bytes_written) {
1104   if (state_ == SS_CLOSED) {
1105     return SR_EOS;
1106   }
1107
1108   if (data_length_ + offset >= buffer_length_) {
1109     return SR_BLOCK;
1110   }
1111
1112   const size_t available = buffer_length_ - data_length_ - offset;
1113   const size_t write_position = (read_position_ + data_length_ + offset)
1114       % buffer_length_;
1115   const size_t copy = _min(bytes, available);
1116   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1117   const char* const p = static_cast<const char*>(buffer);
1118   memcpy(&buffer_[write_position], p, tail_copy);
1119   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1120
1121   if (bytes_written) {
1122     *bytes_written = copy;
1123   }
1124   return SR_SUCCESS;
1125 }
1126
1127
1128
1129 ///////////////////////////////////////////////////////////////////////////////
1130 // LoggingAdapter
1131 ///////////////////////////////////////////////////////////////////////////////
1132
1133 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1134                                const std::string& label, bool hex_mode)
1135     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1136   set_label(label);
1137 }
1138
1139 void LoggingAdapter::set_label(const std::string& label) {
1140   label_.assign("[");
1141   label_.append(label);
1142   label_.append("]");
1143 }
1144
1145 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1146                                   size_t* read, int* error) {
1147   size_t local_read; if (!read) read = &local_read;
1148   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1149                                                      error);
1150   if (result == SR_SUCCESS) {
1151     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1152   }
1153   return result;
1154 }
1155
1156 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1157                                    size_t* written, int* error) {
1158   size_t local_written;
1159   if (!written) written = &local_written;
1160   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1161                                                       error);
1162   if (result == SR_SUCCESS) {
1163     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1164                  &lms_);
1165   }
1166   return result;
1167 }
1168
1169 void LoggingAdapter::Close() {
1170   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1171   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1172   LOG_V(level_) << label_ << " Closed locally";
1173   StreamAdapterInterface::Close();
1174 }
1175
1176 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1177   if (events & SE_OPEN) {
1178     LOG_V(level_) << label_ << " Open";
1179   } else if (events & SE_CLOSE) {
1180     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1181     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1182     LOG_V(level_) << label_ << " Closed with error: " << err;
1183   }
1184   StreamAdapterInterface::OnEvent(stream, events, err);
1185 }
1186
1187 ///////////////////////////////////////////////////////////////////////////////
1188 // StringStream - Reads/Writes to an external std::string
1189 ///////////////////////////////////////////////////////////////////////////////
1190
1191 StringStream::StringStream(std::string& str)
1192     : str_(str), read_pos_(0), read_only_(false) {
1193 }
1194
1195 StringStream::StringStream(const std::string& str)
1196     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1197 }
1198
1199 StreamState StringStream::GetState() const {
1200   return SS_OPEN;
1201 }
1202
1203 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1204                                       size_t* read, int* error) {
1205   size_t available = _min(buffer_len, str_.size() - read_pos_);
1206   if (!available)
1207     return SR_EOS;
1208   memcpy(buffer, str_.data() + read_pos_, available);
1209   read_pos_ += available;
1210   if (read)
1211     *read = available;
1212   return SR_SUCCESS;
1213 }
1214
1215 StreamResult StringStream::Write(const void* data, size_t data_len,
1216                                       size_t* written, int* error) {
1217   if (read_only_) {
1218     if (error) {
1219       *error = -1;
1220     }
1221     return SR_ERROR;
1222   }
1223   str_.append(static_cast<const char*>(data),
1224               static_cast<const char*>(data) + data_len);
1225   if (written)
1226     *written = data_len;
1227   return SR_SUCCESS;
1228 }
1229
1230 void StringStream::Close() {
1231 }
1232
1233 bool StringStream::SetPosition(size_t position) {
1234   if (position > str_.size())
1235     return false;
1236   read_pos_ = position;
1237   return true;
1238 }
1239
1240 bool StringStream::GetPosition(size_t* position) const {
1241   if (position)
1242     *position = read_pos_;
1243   return true;
1244 }
1245
1246 bool StringStream::GetSize(size_t* size) const {
1247   if (size)
1248     *size = str_.size();
1249   return true;
1250 }
1251
1252 bool StringStream::GetAvailable(size_t* size) const {
1253   if (size)
1254     *size = str_.size() - read_pos_;
1255   return true;
1256 }
1257
1258 bool StringStream::ReserveSize(size_t size) {
1259   if (read_only_)
1260     return false;
1261   str_.reserve(size);
1262   return true;
1263 }
1264
1265 ///////////////////////////////////////////////////////////////////////////////
1266 // StreamReference
1267 ///////////////////////////////////////////////////////////////////////////////
1268
1269 StreamReference::StreamReference(StreamInterface* stream)
1270     : StreamAdapterInterface(stream, false) {
1271   // owner set to false so the destructor does not free the stream.
1272   stream_ref_count_ = new StreamRefCount(stream);
1273 }
1274
1275 StreamInterface* StreamReference::NewReference() {
1276   stream_ref_count_->AddReference();
1277   return new StreamReference(stream_ref_count_, stream());
1278 }
1279
1280 StreamReference::~StreamReference() {
1281   stream_ref_count_->Release();
1282 }
1283
1284 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1285                                  StreamInterface* stream)
1286     : StreamAdapterInterface(stream, false),
1287       stream_ref_count_(stream_ref_count) {
1288 }
1289
1290 ///////////////////////////////////////////////////////////////////////////////
1291
1292 StreamResult Flow(StreamInterface* source,
1293                   char* buffer, size_t buffer_len,
1294                   StreamInterface* sink,
1295                   size_t* data_len /* = NULL */) {
1296   ASSERT(buffer_len > 0);
1297
1298   StreamResult result;
1299   size_t count, read_pos, write_pos;
1300   if (data_len) {
1301     read_pos = *data_len;
1302   } else {
1303     read_pos = 0;
1304   }
1305
1306   bool end_of_stream = false;
1307   do {
1308     // Read until buffer is full, end of stream, or error
1309     while (!end_of_stream && (read_pos < buffer_len)) {
1310       result = source->Read(buffer + read_pos, buffer_len - read_pos,
1311                             &count, NULL);
1312       if (result == SR_EOS) {
1313         end_of_stream = true;
1314       } else if (result != SR_SUCCESS) {
1315         if (data_len) {
1316           *data_len = read_pos;
1317         }
1318         return result;
1319       } else {
1320         read_pos += count;
1321       }
1322     }
1323
1324     // Write until buffer is empty, or error (including end of stream)
1325     write_pos = 0;
1326     while (write_pos < read_pos) {
1327       result = sink->Write(buffer + write_pos, read_pos - write_pos,
1328                            &count, NULL);
1329       if (result != SR_SUCCESS) {
1330         if (data_len) {
1331           *data_len = read_pos - write_pos;
1332           if (write_pos > 0) {
1333             memmove(buffer, buffer + write_pos, *data_len);
1334           }
1335         }
1336         return result;
1337       }
1338       write_pos += count;
1339     }
1340
1341     read_pos = 0;
1342   } while (!end_of_stream);
1343
1344   if (data_len) {
1345     *data_len = 0;
1346   }
1347   return SR_SUCCESS;
1348 }
1349
1350 ///////////////////////////////////////////////////////////////////////////////
1351
1352 }  // namespace talk_base