3 * Copyright 2004 Google Inc.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include <sys/types.h>
35 #include "talk/base/basictypes.h"
36 #include "talk/base/common.h"
37 #include "talk/base/logging.h"
38 #include "talk/base/messagequeue.h"
39 #include "talk/base/stream.h"
40 #include "talk/base/stringencode.h"
41 #include "talk/base/stringutils.h"
42 #include "talk/base/thread.h"
43 #include "talk/base/timeutils.h"
46 #include "talk/base/win32.h"
47 #define fileno _fileno
52 ///////////////////////////////////////////////////////////////////////////////
54 ///////////////////////////////////////////////////////////////////////////////
55 StreamInterface::~StreamInterface() {
58 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
59 size_t* written, int* error) {
60 StreamResult result = SR_SUCCESS;
61 size_t total_written = 0, current_written;
62 while (total_written < data_len) {
63 result = Write(static_cast<const char*>(data) + total_written,
64 data_len - total_written, ¤t_written, error);
65 if (result != SR_SUCCESS)
67 total_written += current_written;
70 *written = total_written;
74 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
75 size_t* read, int* error) {
76 StreamResult result = SR_SUCCESS;
77 size_t total_read = 0, current_read;
78 while (total_read < buffer_len) {
79 result = Read(static_cast<char*>(buffer) + total_read,
80 buffer_len - total_read, ¤t_read, error);
81 if (result != SR_SUCCESS)
83 total_read += current_read;
90 StreamResult StreamInterface::ReadLine(std::string* line) {
92 StreamResult result = SR_SUCCESS;
95 result = Read(&ch, sizeof(ch), NULL, NULL);
96 if (result != SR_SUCCESS) {
104 if (!line->empty()) { // give back the line we've collected so far with
105 result = SR_SUCCESS; // a success code. Otherwise return the last code
110 void StreamInterface::PostEvent(Thread* t, int events, int err) {
111 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
114 void StreamInterface::PostEvent(int events, int err) {
115 PostEvent(Thread::Current(), events, err);
118 StreamInterface::StreamInterface() {
121 void StreamInterface::OnMessage(Message* msg) {
122 if (MSG_POST_EVENT == msg->message_id) {
123 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
124 SignalEvent(this, pe->events, pe->error);
129 ///////////////////////////////////////////////////////////////////////////////
130 // StreamAdapterInterface
131 ///////////////////////////////////////////////////////////////////////////////
133 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
135 : stream_(stream), owned_(owned) {
137 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
140 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
142 stream_->SignalEvent.disconnect(this);
148 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
151 StreamInterface* StreamAdapterInterface::Detach() {
153 stream_->SignalEvent.disconnect(this);
154 StreamInterface* stream = stream_;
159 StreamAdapterInterface::~StreamAdapterInterface() {
164 ///////////////////////////////////////////////////////////////////////////////
166 ///////////////////////////////////////////////////////////////////////////////
168 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
169 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
174 void StreamTap::AttachTap(StreamInterface* tap) {
178 StreamInterface* StreamTap::DetachTap() {
179 return tap_.release();
182 StreamResult StreamTap::GetTapResult(int* error) {
189 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
190 size_t* read, int* error) {
195 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
197 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
198 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
203 StreamResult StreamTap::Write(const void* data, size_t data_len,
204 size_t* written, int* error) {
205 size_t backup_written;
207 written = &backup_written;
209 StreamResult res = StreamAdapterInterface::Write(data, data_len,
211 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
212 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
217 ///////////////////////////////////////////////////////////////////////////////
219 ///////////////////////////////////////////////////////////////////////////////
221 StreamSegment::StreamSegment(StreamInterface* stream)
222 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
223 length_(SIZE_UNKNOWN) {
224 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
225 stream->GetPosition(&start_);
228 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
229 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
231 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
232 stream->GetPosition(&start_);
235 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
236 size_t* read, int* error) {
237 if (SIZE_UNKNOWN != length_) {
240 buffer_len = _min(buffer_len, length_ - pos_);
246 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
248 if (SR_SUCCESS == result) {
254 bool StreamSegment::SetPosition(size_t position) {
255 if (SIZE_UNKNOWN == start_)
256 return false; // Not seekable
257 if ((SIZE_UNKNOWN != length_) && (position > length_))
258 return false; // Seek past end of segment
259 if (!StreamAdapterInterface::SetPosition(start_ + position))
265 bool StreamSegment::GetPosition(size_t* position) const {
266 if (SIZE_UNKNOWN == start_)
267 return false; // Not seekable
268 if (!StreamAdapterInterface::GetPosition(position))
271 ASSERT(*position >= start_);
277 bool StreamSegment::GetSize(size_t* size) const {
278 if (!StreamAdapterInterface::GetSize(size))
281 if (SIZE_UNKNOWN != start_) {
282 ASSERT(*size >= start_);
285 if (SIZE_UNKNOWN != length_) {
286 *size = _min(*size, length_);
292 bool StreamSegment::GetAvailable(size_t* size) const {
293 if (!StreamAdapterInterface::GetAvailable(size))
295 if (size && (SIZE_UNKNOWN != length_))
296 *size = _min(*size, length_ - pos_);
300 ///////////////////////////////////////////////////////////////////////////////
302 ///////////////////////////////////////////////////////////////////////////////
304 NullStream::NullStream() {
307 NullStream::~NullStream() {
310 StreamState NullStream::GetState() const {
314 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
315 size_t* read, int* error) {
316 if (error) *error = -1;
320 StreamResult NullStream::Write(const void* data, size_t data_len,
321 size_t* written, int* error) {
322 if (written) *written = data_len;
326 void NullStream::Close() {
329 ///////////////////////////////////////////////////////////////////////////////
331 ///////////////////////////////////////////////////////////////////////////////
333 FileStream::FileStream() : file_(NULL) {
336 FileStream::~FileStream() {
340 bool FileStream::Open(const std::string& filename, const char* mode,
344 std::wstring wfilename;
345 if (Utf8ToWindowsFilename(filename, &wfilename)) {
346 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
354 file_ = fopen(filename.c_str(), mode);
356 if (!file_ && error) {
359 return (file_ != NULL);
362 bool FileStream::OpenShare(const std::string& filename, const char* mode,
363 int shflag, int* error) {
366 std::wstring wfilename;
367 if (Utf8ToWindowsFilename(filename, &wfilename)) {
368 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
369 if (!file_ && error) {
373 return file_ != NULL;
381 return Open(filename, mode, error);
385 bool FileStream::DisableBuffering() {
388 return (setvbuf(file_, NULL, _IONBF, 0) == 0);
391 StreamState FileStream::GetState() const {
392 return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
395 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
396 size_t* read, int* error) {
399 size_t result = fread(buffer, 1, buffer_len, file_);
400 if ((result == 0) && (buffer_len > 0)) {
412 StreamResult FileStream::Write(const void* data, size_t data_len,
413 size_t* written, int* error) {
416 size_t result = fwrite(data, 1, data_len, file_);
417 if ((result == 0) && (data_len > 0)) {
427 void FileStream::Close() {
434 bool FileStream::SetPosition(size_t position) {
437 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
440 bool FileStream::GetPosition(size_t* position) const {
441 ASSERT(NULL != position);
444 long result = ftell(file_);
452 bool FileStream::GetSize(size_t* size) const {
453 ASSERT(NULL != size);
456 struct stat file_stats;
457 if (fstat(fileno(file_), &file_stats) != 0)
460 *size = file_stats.st_size;
464 bool FileStream::GetAvailable(size_t* size) const {
465 ASSERT(NULL != size);
468 long result = ftell(file_);
476 bool FileStream::ReserveSize(size_t size) {
477 // TODO: extend the file to the proper length
481 bool FileStream::GetSize(const std::string& filename, size_t* size) {
482 struct stat file_stats;
483 if (stat(filename.c_str(), &file_stats) != 0)
485 *size = file_stats.st_size;
489 bool FileStream::Flush() {
491 return (0 == fflush(file_));
493 // try to flush empty file?
500 bool FileStream::TryLock() {
507 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
510 bool FileStream::Unlock() {
517 return flock(fileno(file_), LOCK_UN) == 0;
522 void FileStream::DoClose() {
526 CircularFileStream::CircularFileStream(size_t max_size)
527 : max_write_size_(max_size),
529 marked_position_(max_size / 2),
530 last_write_position_(0),
531 read_segment_(READ_LATEST),
532 read_segment_available_(0) {
535 bool CircularFileStream::Open(
536 const std::string& filename, const char* mode, int* error) {
537 if (!FileStream::Open(filename.c_str(), mode, error))
540 if (strchr(mode, "r") != NULL) { // Opened in read mode.
541 // Check if the buffer has been overwritten and determine how to read the
542 // log in time sequence.
545 if (file_size == position_) {
546 // The buffer has not been overwritten yet. Read 0 .. file_size
547 read_segment_ = READ_LATEST;
548 read_segment_available_ = file_size;
550 // The buffer has been over written. There are three segments: The first
551 // one is 0 .. marked_position_, which is the marked earliest log. The
552 // second one is position_ .. file_size, which is the middle log. The
553 // last one is marked_position_ .. position_, which is the latest log.
554 read_segment_ = READ_MARKED;
555 read_segment_available_ = marked_position_;
556 last_write_position_ = position_;
559 // Read from the beginning.
561 SetPosition(position_);
567 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
568 size_t* read, int* error) {
569 if (read_segment_available_ == 0) {
571 switch (read_segment_) {
572 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE.
573 read_segment_ = READ_MIDDLE;
574 position_ = last_write_position_;
575 SetPosition(position_);
577 read_segment_available_ = file_size - position_;
580 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST.
581 read_segment_ = READ_LATEST;
582 position_ = marked_position_;
583 SetPosition(position_);
584 read_segment_available_ = last_write_position_ - position_;
587 default: // Finished READ_LATEST and return EOS.
588 return talk_base::SR_EOS;
593 if (!read) read = &local_read;
595 size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
596 talk_base::StreamResult result
597 = talk_base::FileStream::Read(buffer, to_read, read, error);
598 if (result == talk_base::SR_SUCCESS) {
599 read_segment_available_ -= *read;
605 StreamResult CircularFileStream::Write(const void* data, size_t data_len,
606 size_t* written, int* error) {
607 if (position_ >= max_write_size_) {
608 ASSERT(position_ == max_write_size_);
609 position_ = marked_position_;
610 SetPosition(position_);
613 size_t local_written;
614 if (!written) written = &local_written;
616 size_t to_eof = max_write_size_ - position_;
617 size_t to_write = talk_base::_min(data_len, to_eof);
618 talk_base::StreamResult result
619 = talk_base::FileStream::Write(data, to_write, written, error);
620 if (result == talk_base::SR_SUCCESS) {
621 position_ += *written;
626 AsyncWriteStream::~AsyncWriteStream() {
627 write_thread_->Clear(this, 0, NULL);
628 ClearBufferAndWrite();
630 CritScope cs(&crit_stream_);
634 // This is needed by some stream writers, such as RtpDumpWriter.
635 bool AsyncWriteStream::GetPosition(size_t* position) const {
636 CritScope cs(&crit_stream_);
637 return stream_->GetPosition(position);
640 // This is needed by some stream writers, such as the plugin log writers.
641 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
642 size_t* read, int* error) {
643 CritScope cs(&crit_stream_);
644 return stream_->Read(buffer, buffer_len, read, error);
647 void AsyncWriteStream::Close() {
648 if (state_ == SS_CLOSED) {
652 write_thread_->Clear(this, 0, NULL);
653 ClearBufferAndWrite();
655 CritScope cs(&crit_stream_);
660 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
661 size_t* written, int* error) {
662 if (state_ == SS_CLOSED) {
666 size_t previous_buffer_length = 0;
668 CritScope cs(&crit_buffer_);
669 previous_buffer_length = buffer_.length();
670 buffer_.AppendData(data, data_len);
673 if (previous_buffer_length == 0) {
674 // If there's stuff already in the buffer, then we already called
675 // Post and the write_thread_ hasn't pulled it out yet, so we
676 // don't need to re-Post.
677 write_thread_->Post(this, 0, NULL);
679 // Return immediately, assuming that it works.
686 void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
687 ClearBufferAndWrite();
690 bool AsyncWriteStream::Flush() {
691 if (state_ == SS_CLOSED) {
695 ClearBufferAndWrite();
697 CritScope cs(&crit_stream_);
698 return stream_->Flush();
701 void AsyncWriteStream::ClearBufferAndWrite() {
704 CritScope cs_buffer(&crit_buffer_);
705 buffer_.TransferTo(&to_write);
708 if (to_write.length() > 0) {
709 CritScope cs(&crit_stream_);
710 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
716 // Have to identically rewrite the FileStream destructor or else it would call
717 // the base class's Close() instead of the sub-class's.
718 POpenStream::~POpenStream() {
719 POpenStream::Close();
722 bool POpenStream::Open(const std::string& subcommand,
726 file_ = popen(subcommand.c_str(), mode);
735 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
736 int shflag, int* error) {
737 return Open(subcommand, mode, error);
740 void POpenStream::DoClose() {
741 wait_status_ = pclose(file_);
746 ///////////////////////////////////////////////////////////////////////////////
748 ///////////////////////////////////////////////////////////////////////////////
750 MemoryStreamBase::MemoryStreamBase()
751 : buffer_(NULL), buffer_length_(0), data_length_(0),
755 StreamState MemoryStreamBase::GetState() const {
759 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
760 size_t* bytes_read, int* error) {
761 if (seek_position_ >= data_length_) {
764 size_t available = data_length_ - seek_position_;
765 if (bytes > available) {
766 // Read partial buffer
769 memcpy(buffer, &buffer_[seek_position_], bytes);
770 seek_position_ += bytes;
777 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
778 size_t* bytes_written, int* error) {
779 size_t available = buffer_length_ - seek_position_;
780 if (0 == available) {
781 // Increase buffer size to the larger of:
782 // a) new position rounded up to next 256 bytes
783 // b) double the previous length
784 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
786 StreamResult result = DoReserve(new_buffer_length, error);
787 if (SR_SUCCESS != result) {
790 ASSERT(buffer_length_ >= new_buffer_length);
791 available = buffer_length_ - seek_position_;
794 if (bytes > available) {
797 memcpy(&buffer_[seek_position_], buffer, bytes);
798 seek_position_ += bytes;
799 if (data_length_ < seek_position_) {
800 data_length_ = seek_position_;
803 *bytes_written = bytes;
808 void MemoryStreamBase::Close() {
812 bool MemoryStreamBase::SetPosition(size_t position) {
813 if (position > data_length_)
815 seek_position_ = position;
819 bool MemoryStreamBase::GetPosition(size_t* position) const {
821 *position = seek_position_;
825 bool MemoryStreamBase::GetSize(size_t* size) const {
827 *size = data_length_;
831 bool MemoryStreamBase::GetAvailable(size_t* size) const {
833 *size = data_length_ - seek_position_;
837 bool MemoryStreamBase::ReserveSize(size_t size) {
838 return (SR_SUCCESS == DoReserve(size, NULL));
841 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
842 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
845 ///////////////////////////////////////////////////////////////////////////////
847 MemoryStream::MemoryStream()
848 : buffer_alloc_(NULL) {
851 MemoryStream::MemoryStream(const char* data)
852 : buffer_alloc_(NULL) {
853 SetData(data, strlen(data));
856 MemoryStream::MemoryStream(const void* data, size_t length)
857 : buffer_alloc_(NULL) {
858 SetData(data, length);
861 MemoryStream::~MemoryStream() {
862 delete [] buffer_alloc_;
865 void MemoryStream::SetData(const void* data, size_t length) {
866 data_length_ = buffer_length_ = length;
867 delete [] buffer_alloc_;
868 buffer_alloc_ = new char[buffer_length_ + kAlignment];
869 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
870 memcpy(buffer_, data, data_length_);
874 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
875 if (buffer_length_ >= size)
878 if (char* new_buffer_alloc = new char[size + kAlignment]) {
879 char* new_buffer = reinterpret_cast<char*>(
880 ALIGNP(new_buffer_alloc, kAlignment));
881 memcpy(new_buffer, buffer_, data_length_);
882 delete [] buffer_alloc_;
883 buffer_alloc_ = new_buffer_alloc;
884 buffer_ = new_buffer;
885 buffer_length_ = size;
895 ///////////////////////////////////////////////////////////////////////////////
897 ExternalMemoryStream::ExternalMemoryStream() {
900 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
901 SetData(data, length);
904 ExternalMemoryStream::~ExternalMemoryStream() {
907 void ExternalMemoryStream::SetData(void* data, size_t length) {
908 data_length_ = buffer_length_ = length;
909 buffer_ = static_cast<char*>(data);
913 ///////////////////////////////////////////////////////////////////////////////
915 ///////////////////////////////////////////////////////////////////////////////
917 FifoBuffer::FifoBuffer(size_t size)
918 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
919 data_length_(0), read_position_(0), owner_(Thread::Current()) {
920 // all events are done on the owner_ thread
923 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
924 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
925 data_length_(0), read_position_(0), owner_(owner) {
926 // all events are done on the owner_ thread
929 FifoBuffer::~FifoBuffer() {
932 bool FifoBuffer::GetBuffered(size_t* size) const {
933 CritScope cs(&crit_);
934 *size = data_length_;
938 bool FifoBuffer::SetCapacity(size_t size) {
939 CritScope cs(&crit_);
940 if (data_length_ > size) {
944 if (size != buffer_length_) {
945 char* buffer = new char[size];
946 const size_t copy = data_length_;
947 const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
948 memcpy(buffer, &buffer_[read_position_], tail_copy);
949 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
950 buffer_.reset(buffer);
952 buffer_length_ = size;
957 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
958 size_t offset, size_t* bytes_read) {
959 CritScope cs(&crit_);
960 return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
963 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
964 size_t offset, size_t* bytes_written) {
965 CritScope cs(&crit_);
966 return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
969 StreamState FifoBuffer::GetState() const {
973 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
974 size_t* bytes_read, int* error) {
975 CritScope cs(&crit_);
976 const bool was_writable = data_length_ < buffer_length_;
978 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©);
980 if (result == SR_SUCCESS) {
981 // If read was successful then adjust the read position and number of
983 read_position_ = (read_position_ + copy) % buffer_length_;
984 data_length_ -= copy;
989 // if we were full before, and now we're not, post an event
990 if (!was_writable && copy > 0) {
991 PostEvent(owner_, SE_WRITE, 0);
997 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
998 size_t* bytes_written, int* error) {
999 CritScope cs(&crit_);
1001 const bool was_readable = (data_length_ > 0);
1003 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©);
1005 if (result == SR_SUCCESS) {
1006 // If write was successful then adjust the number of readable bytes.
1007 data_length_ += copy;
1008 if (bytes_written) {
1009 *bytes_written = copy;
1012 // if we didn't have any data to read before, and now we do, post an event
1013 if (!was_readable && copy > 0) {
1014 PostEvent(owner_, SE_READ, 0);
1020 void FifoBuffer::Close() {
1021 CritScope cs(&crit_);
1025 const void* FifoBuffer::GetReadData(size_t* size) {
1026 CritScope cs(&crit_);
1027 *size = (read_position_ + data_length_ <= buffer_length_) ?
1028 data_length_ : buffer_length_ - read_position_;
1029 return &buffer_[read_position_];
1032 void FifoBuffer::ConsumeReadData(size_t size) {
1033 CritScope cs(&crit_);
1034 ASSERT(size <= data_length_);
1035 const bool was_writable = data_length_ < buffer_length_;
1036 read_position_ = (read_position_ + size) % buffer_length_;
1037 data_length_ -= size;
1038 if (!was_writable && size > 0) {
1039 PostEvent(owner_, SE_WRITE, 0);
1043 void* FifoBuffer::GetWriteBuffer(size_t* size) {
1044 CritScope cs(&crit_);
1045 if (state_ == SS_CLOSED) {
1049 // if empty, reset the write position to the beginning, so we can get
1050 // the biggest possible block
1051 if (data_length_ == 0) {
1055 const size_t write_position = (read_position_ + data_length_)
1057 *size = (write_position > read_position_ || data_length_ == 0) ?
1058 buffer_length_ - write_position : read_position_ - write_position;
1059 return &buffer_[write_position];
1062 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
1063 CritScope cs(&crit_);
1064 ASSERT(size <= buffer_length_ - data_length_);
1065 const bool was_readable = (data_length_ > 0);
1066 data_length_ += size;
1067 if (!was_readable && size > 0) {
1068 PostEvent(owner_, SE_READ, 0);
1072 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
1073 CritScope cs(&crit_);
1074 *size = buffer_length_ - data_length_;
1078 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
1081 size_t* bytes_read) {
1082 if (offset >= data_length_) {
1083 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
1086 const size_t available = data_length_ - offset;
1087 const size_t read_position = (read_position_ + offset) % buffer_length_;
1088 const size_t copy = _min(bytes, available);
1089 const size_t tail_copy = _min(copy, buffer_length_ - read_position);
1090 char* const p = static_cast<char*>(buffer);
1091 memcpy(p, &buffer_[read_position], tail_copy);
1092 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
1100 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
1103 size_t* bytes_written) {
1104 if (state_ == SS_CLOSED) {
1108 if (data_length_ + offset >= buffer_length_) {
1112 const size_t available = buffer_length_ - data_length_ - offset;
1113 const size_t write_position = (read_position_ + data_length_ + offset)
1115 const size_t copy = _min(bytes, available);
1116 const size_t tail_copy = _min(copy, buffer_length_ - write_position);
1117 const char* const p = static_cast<const char*>(buffer);
1118 memcpy(&buffer_[write_position], p, tail_copy);
1119 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
1121 if (bytes_written) {
1122 *bytes_written = copy;
1129 ///////////////////////////////////////////////////////////////////////////////
1131 ///////////////////////////////////////////////////////////////////////////////
1133 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
1134 const std::string& label, bool hex_mode)
1135 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
1139 void LoggingAdapter::set_label(const std::string& label) {
1141 label_.append(label);
1145 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
1146 size_t* read, int* error) {
1147 size_t local_read; if (!read) read = &local_read;
1148 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
1150 if (result == SR_SUCCESS) {
1151 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
1156 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
1157 size_t* written, int* error) {
1158 size_t local_written;
1159 if (!written) written = &local_written;
1160 StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
1162 if (result == SR_SUCCESS) {
1163 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
1169 void LoggingAdapter::Close() {
1170 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1171 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1172 LOG_V(level_) << label_ << " Closed locally";
1173 StreamAdapterInterface::Close();
1176 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
1177 if (events & SE_OPEN) {
1178 LOG_V(level_) << label_ << " Open";
1179 } else if (events & SE_CLOSE) {
1180 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
1181 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
1182 LOG_V(level_) << label_ << " Closed with error: " << err;
1184 StreamAdapterInterface::OnEvent(stream, events, err);
1187 ///////////////////////////////////////////////////////////////////////////////
1188 // StringStream - Reads/Writes to an external std::string
1189 ///////////////////////////////////////////////////////////////////////////////
1191 StringStream::StringStream(std::string& str)
1192 : str_(str), read_pos_(0), read_only_(false) {
1195 StringStream::StringStream(const std::string& str)
1196 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
1199 StreamState StringStream::GetState() const {
1203 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
1204 size_t* read, int* error) {
1205 size_t available = _min(buffer_len, str_.size() - read_pos_);
1208 memcpy(buffer, str_.data() + read_pos_, available);
1209 read_pos_ += available;
1215 StreamResult StringStream::Write(const void* data, size_t data_len,
1216 size_t* written, int* error) {
1223 str_.append(static_cast<const char*>(data),
1224 static_cast<const char*>(data) + data_len);
1226 *written = data_len;
1230 void StringStream::Close() {
1233 bool StringStream::SetPosition(size_t position) {
1234 if (position > str_.size())
1236 read_pos_ = position;
1240 bool StringStream::GetPosition(size_t* position) const {
1242 *position = read_pos_;
1246 bool StringStream::GetSize(size_t* size) const {
1248 *size = str_.size();
1252 bool StringStream::GetAvailable(size_t* size) const {
1254 *size = str_.size() - read_pos_;
1258 bool StringStream::ReserveSize(size_t size) {
1265 ///////////////////////////////////////////////////////////////////////////////
1267 ///////////////////////////////////////////////////////////////////////////////
1269 StreamReference::StreamReference(StreamInterface* stream)
1270 : StreamAdapterInterface(stream, false) {
1271 // owner set to false so the destructor does not free the stream.
1272 stream_ref_count_ = new StreamRefCount(stream);
1275 StreamInterface* StreamReference::NewReference() {
1276 stream_ref_count_->AddReference();
1277 return new StreamReference(stream_ref_count_, stream());
1280 StreamReference::~StreamReference() {
1281 stream_ref_count_->Release();
1284 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
1285 StreamInterface* stream)
1286 : StreamAdapterInterface(stream, false),
1287 stream_ref_count_(stream_ref_count) {
1290 ///////////////////////////////////////////////////////////////////////////////
1292 StreamResult Flow(StreamInterface* source,
1293 char* buffer, size_t buffer_len,
1294 StreamInterface* sink,
1295 size_t* data_len /* = NULL */) {
1296 ASSERT(buffer_len > 0);
1298 StreamResult result;
1299 size_t count, read_pos, write_pos;
1301 read_pos = *data_len;
1306 bool end_of_stream = false;
1308 // Read until buffer is full, end of stream, or error
1309 while (!end_of_stream && (read_pos < buffer_len)) {
1310 result = source->Read(buffer + read_pos, buffer_len - read_pos,
1312 if (result == SR_EOS) {
1313 end_of_stream = true;
1314 } else if (result != SR_SUCCESS) {
1316 *data_len = read_pos;
1324 // Write until buffer is empty, or error (including end of stream)
1326 while (write_pos < read_pos) {
1327 result = sink->Write(buffer + write_pos, read_pos - write_pos,
1329 if (result != SR_SUCCESS) {
1331 *data_len = read_pos - write_pos;
1332 if (write_pos > 0) {
1333 memmove(buffer, buffer + write_pos, *data_len);
1342 } while (!end_of_stream);
1350 ///////////////////////////////////////////////////////////////////////////////
1352 } // namespace talk_base