1 // Copyright 2011 The Chromium Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
7 // These memory-resident streams are used for serializing data into a sequential
10 // Streams are divided into SourceStreams for reading and SinkStreams for
11 // writing. Streams are aggregated into Sets which allows several streams to be
12 // used at once. Example: we can write A1, B1, A2, B2 but achieve the memory
13 // layout A1 A2 B1 B2 by writing 'A's to one stream and 'B's to another.
15 // The aggregated streams are important to Courgette's compression efficiency,
16 // we use it to cluster similar kinds of data which helps to generate longer
17 // common subsequences and repeated sequences.
19 #include "courgette/streams.h"
25 #include "base/logging.h"
29 // Update this version number if the serialization format of a StreamSet
31 static const unsigned int kStreamsSerializationFormatVersion = 20090218;
34 // This is a cut down Varint implementation, implementing only what we use for
39 // Maximum lengths of varint encoding of uint32_t
40 static const int kMax32 = 5;
42 // Parses a Varint32 encoded value from |source| and stores it in |output|,
43 // and returns a pointer to the following byte. Returns nullptr if a valid
44 // varint value was not found before |limit|.
45 static const uint8_t* Parse32WithLimit(const uint8_t* source,
49 // Writes the Varint32 encoded representation of |value| to buffer
50 // |destination|. |destination| must have sufficient length to hold kMax32
51 // bytes. Returns a pointer to the byte just past the last encoded byte.
52 static uint8_t* Encode32(uint8_t* destination, uint32_t value);
55 // Parses a Varint32 encoded unsigned number from |source|. The Varint32
56 // encoding is a little-endian sequence of bytes containing base-128 digits,
57 // with the high order bit set to indicate if there are more digits.
59 // For each byte, we mask out the digit and 'or' it into the right place in the
62 // The digit loop is unrolled for performance. It usually exits after the first
64 const uint8_t* Varint::Parse32WithLimit(const uint8_t* source,
67 uint32_t digit, result;
80 result |= (digit & 127) << 7;
89 result |= (digit & 127) << 14;
98 result |= (digit & 127) << 21;
107 result |= (digit & 127) << 28;
113 return nullptr; // Value is too long to be a Varint32.
116 // Write the base-128 digits in little-endian order. All except the last digit
117 // have the high bit set to indicate more digits.
118 inline uint8_t* Varint::Encode32(uint8_t* destination, uint32_t value) {
119 while (value >= 128) {
120 *(destination++) = static_cast<uint8_t>(value) | 128;
123 *(destination++) = static_cast<uint8_t>(value);
127 void SourceStream::Init(const SinkStream& sink) {
128 Init(sink.Buffer(), sink.Length());
131 bool SourceStream::Read(void* destination, size_t count) {
132 if (current_ + count > end_)
134 memcpy(destination, current_, count);
139 bool SourceStream::ReadVarint32(uint32_t* output_value) {
140 const uint8_t* after = Varint::Parse32WithLimit(current_, end_, output_value);
147 bool SourceStream::ReadVarint32Signed(int32_t* output_value) {
148 // Signed numbers are encoded as unsigned numbers so that numbers nearer zero
149 // have shorter varint encoding.
150 // 0000xxxx encoded as 000xxxx0.
151 // 1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx.
152 uint32_t unsigned_value;
153 if (!ReadVarint32(&unsigned_value))
155 if (unsigned_value & 1)
156 *output_value = ~static_cast<int32_t>(unsigned_value >> 1);
158 *output_value = (unsigned_value >> 1);
162 bool SourceStream::ShareSubstream(size_t offset, size_t length,
163 SourceStream* substream) {
164 if (offset > Remaining())
166 if (length > Remaining() - offset)
168 substream->Init(current_ + offset, length);
172 bool SourceStream::ReadSubstream(size_t length, SourceStream* substream) {
173 if (!ShareSubstream(0, length, substream))
179 bool SourceStream::Skip(size_t byte_count) {
180 if (current_ + byte_count > end_)
182 current_ += byte_count;
186 CheckBool SinkStream::Write(const void* data, size_t byte_count) {
187 return buffer_.append(static_cast<const char*>(data), byte_count);
190 CheckBool SinkStream::WriteVarint32(uint32_t value) {
191 uint8_t buffer[Varint::kMax32];
192 uint8_t* end = Varint::Encode32(buffer, value);
193 return Write(buffer, end - buffer);
196 CheckBool SinkStream::WriteVarint32Signed(int32_t value) {
197 // Encode signed numbers so that numbers nearer zero have shorter
199 // 0000xxxx encoded as 000xxxx0.
200 // 1111xxxx encoded as 000yyyy1 where yyyy is complement of xxxx.
203 ret = WriteVarint32(~value * 2 + 1);
205 ret = WriteVarint32(value * 2);
209 CheckBool SinkStream::WriteSizeVarint32(size_t value) {
210 uint32_t narrowed_value = static_cast<uint32_t>(value);
211 // On 32-bit, the compiler should figure out this test always fails.
212 LOG_ASSERT(value == narrowed_value);
213 return WriteVarint32(narrowed_value);
216 CheckBool SinkStream::Append(SinkStream* other) {
217 bool ret = Write(other->buffer_.data(), other->buffer_.size());
223 void SinkStream::Retire() {
227 ////////////////////////////////////////////////////////////////////////////////
229 SourceStreamSet::SourceStreamSet()
230 : count_(kMaxStreams) {
233 SourceStreamSet::~SourceStreamSet() = default;
235 // Initializes from |source|.
236 // The stream set for N streams is serialized as a header
237 // <version><N><length1><length2>...<lengthN>
238 // followed by the stream contents
239 // <bytes1><bytes2>...<bytesN>
241 bool SourceStreamSet::Init(const void* source, size_t byte_count) {
242 const uint8_t* start = static_cast<const uint8_t*>(source);
243 const uint8_t* end = start + byte_count;
245 unsigned int version;
246 const uint8_t* finger = Varint::Parse32WithLimit(start, end, &version);
247 if (finger == nullptr)
249 if (version != kStreamsSerializationFormatVersion)
253 finger = Varint::Parse32WithLimit(finger, end, &count);
254 if (finger == nullptr)
256 if (count > kMaxStreams)
261 unsigned int lengths[kMaxStreams];
262 size_t accumulated_length = 0;
264 for (size_t i = 0; i < count_; ++i) {
265 finger = Varint::Parse32WithLimit(finger, end, &lengths[i]);
266 if (finger == nullptr)
268 accumulated_length += lengths[i];
271 // Remaining bytes should add up to sum of lengths.
272 if (static_cast<size_t>(end - finger) != accumulated_length)
275 accumulated_length = finger - start;
276 for (size_t i = 0; i < count_; ++i) {
277 stream(i)->Init(start + accumulated_length, lengths[i]);
278 accumulated_length += lengths[i];
284 bool SourceStreamSet::Init(SourceStream* source) {
285 // TODO(sra): consume the rest of |source|.
286 return Init(source->Buffer(), source->Remaining());
289 bool SourceStreamSet::ReadSet(SourceStreamSet* set) {
290 uint32_t stream_count = 0;
291 SourceStream* control_stream = this->stream(0);
292 if (!control_stream->ReadVarint32(&stream_count))
295 uint32_t lengths[kMaxStreams] = {}; // i.e. all zero.
297 for (size_t i = 0; i < stream_count; ++i) {
298 if (!control_stream->ReadVarint32(&lengths[i]))
302 for (size_t i = 0; i < stream_count; ++i) {
303 if (!this->stream(i)->ReadSubstream(lengths[i], set->stream(i)))
309 bool SourceStreamSet::Empty() const {
310 for (size_t i = 0; i < count_; ++i) {
311 if (streams_[i].Remaining() != 0)
317 ////////////////////////////////////////////////////////////////////////////////
319 SinkStreamSet::SinkStreamSet()
320 : count_(kMaxStreams) {
323 SinkStreamSet::~SinkStreamSet() = default;
325 void SinkStreamSet::Init(size_t stream_index_limit) {
326 count_ = stream_index_limit;
329 // The header for a stream set for N streams is serialized as
330 // <version><N><length1><length2>...<lengthN>
331 CheckBool SinkStreamSet::CopyHeaderTo(SinkStream* header) {
332 bool ret = header->WriteVarint32(kStreamsSerializationFormatVersion);
334 ret = header->WriteSizeVarint32(count_);
335 for (size_t i = 0; ret && i < count_; ++i) {
336 ret = header->WriteSizeVarint32(stream(i)->Length());
342 // Writes |this| to |combined_stream|. See SourceStreamSet::Init for the layout
343 // of the stream metadata and contents.
344 CheckBool SinkStreamSet::CopyTo(SinkStream *combined_stream) {
346 bool ret = CopyHeaderTo(&header);
350 // Reserve the correct amount of storage.
351 size_t length = header.Length();
352 for (size_t i = 0; i < count_; ++i) {
353 length += stream(i)->Length();
355 ret = combined_stream->Reserve(length);
357 ret = combined_stream->Append(&header);
358 for (size_t i = 0; ret && i < count_; ++i) {
359 ret = combined_stream->Append(stream(i));
365 CheckBool SinkStreamSet::WriteSet(SinkStreamSet* set) {
366 uint32_t lengths[kMaxStreams];
367 // 'stream_count' includes all non-empty streams and all empty stream numbered
368 // lower than a non-empty stream.
369 size_t stream_count = 0;
370 for (size_t i = 0; i < kMaxStreams; ++i) {
371 SinkStream* stream = set->stream(i);
372 lengths[i] = static_cast<uint32_t>(stream->Length());
374 stream_count = i + 1;
377 SinkStream* control_stream = this->stream(0);
378 bool ret = control_stream->WriteSizeVarint32(stream_count);
379 for (size_t i = 0; ret && i < stream_count; ++i) {
380 ret = control_stream->WriteSizeVarint32(lengths[i]);
383 for (size_t i = 0; ret && i < stream_count; ++i) {
384 ret = this->stream(i)->Append(set->stream(i));