1 // Protocol Buffers - Google's data interchange format
2 // Copyright 2008 Google Inc. All rights reserved.
3 // http://code.google.com/p/protobuf/
5 // Redistribution and use in source and binary forms, with or without
6 // modification, are permitted provided that the following conditions are
9 // * Redistributions of source code must retain the above copyright
10 // notice, this list of conditions and the following disclaimer.
11 // * Redistributions in binary form must reproduce the above
12 // copyright notice, this list of conditions and the following disclaimer
13 // in the documentation and/or other materials provided with the
15 // * Neither the name of Google Inc. nor the names of its
16 // contributors may be used to endorse or promote products derived from
17 // this software without specific prior written permission.
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 // Author: kenton@google.com (Kenton Varda)
32 // Based on original Protocol Buffers design by
33 // Sanjay Ghemawat, Jeff Dean, and others.
39 #include <sys/types.h>
47 #include <google/protobuf/io/zero_copy_stream_impl.h>
48 #include <google/protobuf/stubs/common.h>
49 #include <google/protobuf/stubs/stl_util.h>
57 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its
58 // return value is undefined. We re-define it to always produce an error.
59 #define lseek(fd, offset, origin) ((off_t)-1)
65 int close_no_eintr(int fd) {
69 } while (result < 0 && errno == EINTR);
76 // ===================================================================
78 FileInputStream::FileInputStream(int file_descriptor, int block_size)
79 : copying_input_(file_descriptor),
80 impl_(©ing_input_, block_size) {
83 FileInputStream::~FileInputStream() {}
85 bool FileInputStream::Close() {
86 return copying_input_.Close();
89 bool FileInputStream::Next(const void** data, int* size) {
90 return impl_.Next(data, size);
93 void FileInputStream::BackUp(int count) {
97 bool FileInputStream::Skip(int count) {
98 return impl_.Skip(count);
101 int64 FileInputStream::ByteCount() const {
102 return impl_.ByteCount();
105 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
107 : file_(file_descriptor),
108 close_on_delete_(false),
111 previous_seek_failed_(false) {
114 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
115 if (close_on_delete_) {
117 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
122 bool FileInputStream::CopyingFileInputStream::Close() {
123 GOOGLE_CHECK(!is_closed_);
126 if (close_no_eintr(file_) != 0) {
127 // The docs on close() do not specify whether a file descriptor is still
128 // open after close() fails with EIO. However, the glibc source code
129 // seems to indicate that it is not.
137 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
138 GOOGLE_CHECK(!is_closed_);
142 result = read(file_, buffer, size);
143 } while (result < 0 && errno == EINTR);
146 // Read error (not EOF).
153 int FileInputStream::CopyingFileInputStream::Skip(int count) {
154 GOOGLE_CHECK(!is_closed_);
156 if (!previous_seek_failed_ &&
157 lseek(file_, count, SEEK_CUR) != (off_t)-1) {
163 // Note to self: Don't seek again. This file descriptor doesn't
165 previous_seek_failed_ = true;
167 // Use the default implementation.
168 return CopyingInputStream::Skip(count);
172 // ===================================================================
174 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
175 : copying_output_(file_descriptor),
176 impl_(©ing_output_, block_size) {
179 FileOutputStream::~FileOutputStream() {
183 bool FileOutputStream::Close() {
184 bool flush_succeeded = impl_.Flush();
185 return copying_output_.Close() && flush_succeeded;
188 bool FileOutputStream::Flush() {
189 return impl_.Flush();
192 bool FileOutputStream::Next(void** data, int* size) {
193 return impl_.Next(data, size);
196 void FileOutputStream::BackUp(int count) {
200 int64 FileOutputStream::ByteCount() const {
201 return impl_.ByteCount();
204 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
206 : file_(file_descriptor),
207 close_on_delete_(false),
212 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
213 if (close_on_delete_) {
215 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
220 bool FileOutputStream::CopyingFileOutputStream::Close() {
221 GOOGLE_CHECK(!is_closed_);
224 if (close_no_eintr(file_) != 0) {
225 // The docs on close() do not specify whether a file descriptor is still
226 // open after close() fails with EIO. However, the glibc source code
227 // seems to indicate that it is not.
235 bool FileOutputStream::CopyingFileOutputStream::Write(
236 const void* buffer, int size) {
237 GOOGLE_CHECK(!is_closed_);
238 int total_written = 0;
240 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
242 while (total_written < size) {
245 bytes = write(file_, buffer_base + total_written, size - total_written);
246 } while (bytes < 0 && errno == EINTR);
251 // FIXME(kenton): According to the man page, if write() returns zero,
252 // there was no error; write() simply did not write anything. It's
253 // unclear under what circumstances this might happen, but presumably
254 // errno won't be set in this case. I am confused as to how such an
255 // event should be handled. For now I'm treating it as an error, since
256 // retrying seems like it could lead to an infinite loop. I suspect
257 // this never actually happens anyway.
264 total_written += bytes;
270 // ===================================================================
272 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
273 : copying_input_(input),
274 impl_(©ing_input_, block_size) {
277 IstreamInputStream::~IstreamInputStream() {}
279 bool IstreamInputStream::Next(const void** data, int* size) {
280 return impl_.Next(data, size);
283 void IstreamInputStream::BackUp(int count) {
287 bool IstreamInputStream::Skip(int count) {
288 return impl_.Skip(count);
291 int64 IstreamInputStream::ByteCount() const {
292 return impl_.ByteCount();
295 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
300 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
302 int IstreamInputStream::CopyingIstreamInputStream::Read(
303 void* buffer, int size) {
304 input_->read(reinterpret_cast<char*>(buffer), size);
305 int result = input_->gcount();
306 if (result == 0 && input_->fail() && !input_->eof()) {
312 // ===================================================================
314 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
315 : copying_output_(output),
316 impl_(©ing_output_, block_size) {
319 OstreamOutputStream::~OstreamOutputStream() {
323 bool OstreamOutputStream::Next(void** data, int* size) {
324 return impl_.Next(data, size);
327 void OstreamOutputStream::BackUp(int count) {
331 int64 OstreamOutputStream::ByteCount() const {
332 return impl_.ByteCount();
335 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
340 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
343 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
344 const void* buffer, int size) {
345 output_->write(reinterpret_cast<const char*>(buffer), size);
346 return output_->good();
349 // ===================================================================
351 ConcatenatingInputStream::ConcatenatingInputStream(
352 ZeroCopyInputStream* const streams[], int count)
353 : streams_(streams), stream_count_(count), bytes_retired_(0) {
356 ConcatenatingInputStream::~ConcatenatingInputStream() {
359 bool ConcatenatingInputStream::Next(const void** data, int* size) {
360 while (stream_count_ > 0) {
361 if (streams_[0]->Next(data, size)) return true;
363 // That stream is done. Advance to the next one.
364 bytes_retired_ += streams_[0]->ByteCount();
373 void ConcatenatingInputStream::BackUp(int count) {
374 if (stream_count_ > 0) {
375 streams_[0]->BackUp(count);
377 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
381 bool ConcatenatingInputStream::Skip(int count) {
382 while (stream_count_ > 0) {
383 // Assume that ByteCount() can be used to find out how much we actually
384 // skipped when Skip() fails.
385 int64 target_byte_count = streams_[0]->ByteCount() + count;
386 if (streams_[0]->Skip(count)) return true;
388 // Hit the end of the stream. Figure out how many more bytes we still have
390 int64 final_byte_count = streams_[0]->ByteCount();
391 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
392 count = target_byte_count - final_byte_count;
394 // That stream is done. Advance to the next one.
395 bytes_retired_ += final_byte_count;
403 int64 ConcatenatingInputStream::ByteCount() const {
404 if (stream_count_ == 0) {
405 return bytes_retired_;
407 return bytes_retired_ + streams_[0]->ByteCount();
412 // ===================================================================
414 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
416 : input_(input), limit_(limit) {}
418 LimitingInputStream::~LimitingInputStream() {
419 // If we overshot the limit, back up.
420 if (limit_ < 0) input_->BackUp(-limit_);
423 bool LimitingInputStream::Next(const void** data, int* size) {
424 if (limit_ <= 0) return false;
425 if (!input_->Next(data, size)) return false;
429 // We overshot the limit. Reduce *size to hide the rest of the buffer.
435 void LimitingInputStream::BackUp(int count) {
437 input_->BackUp(count - limit_);
440 input_->BackUp(count);
445 bool LimitingInputStream::Skip(int count) {
446 if (count > limit_) {
447 if (limit_ < 0) return false;
448 input_->Skip(limit_);
452 if (!input_->Skip(count)) return false;
458 int64 LimitingInputStream::ByteCount() const {
460 return input_->ByteCount() + limit_;
462 return input_->ByteCount();
467 // ===================================================================
470 } // namespace protobuf
471 } // namespace google