SkRWBuffer for thread-safe 'stream' sharing
authorreed <reed@chromium.org>
Wed, 29 Apr 2015 00:50:31 +0000 (17:50 -0700)
committerCommit bot <commit-bot@chromium.org>
Wed, 29 Apr 2015 00:50:32 +0000 (17:50 -0700)
WIP
- Can accumulate (write) data in one thread, and share snapshots of it in other threads
  ... e.g. network accumulates image data, and periodically we want to decode/draw it
- If this sort of thing sticks, should we promote SkData to have the same generality as
  SkRBuffer?

BUG=skia:
TBR=

Review URL: https://codereview.chromium.org/1106113002

gyp/core.gypi
src/core/SkRWBuffer.cpp [new file with mode: 0644]
src/core/SkRWBuffer.h [new file with mode: 0644]
tests/DataRefTest.cpp

index 94878e1..190236f 100644 (file)
         '<(skia_src_path)/core/SkRRect.cpp',
         '<(skia_src_path)/core/SkRTree.h',
         '<(skia_src_path)/core/SkRTree.cpp',
+        '<(skia_src_path)/core/SkRWBuffer.cpp',
         '<(skia_src_path)/core/SkScalar.cpp',
         '<(skia_src_path)/core/SkScalerContext.cpp',
         '<(skia_src_path)/core/SkScalerContext.h',
diff --git a/src/core/SkRWBuffer.cpp b/src/core/SkRWBuffer.cpp
new file mode 100644 (file)
index 0000000..33d82af
--- /dev/null
@@ -0,0 +1,353 @@
+/*
+ * Copyright 2015 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#include "SkRWBuffer.h"
+#include "SkStream.h"
+
+// Force small chunks to be a page's worth
+static const size_t kMinAllocSize = 4096;
+
+struct SkBufferBlock {
+    SkBufferBlock*  fNext;
+    size_t          fUsed;
+    size_t          fCapacity;
+    
+    const void* startData() const { return this + 1; };
+    
+    size_t avail() const { return fCapacity - fUsed; }
+    void* availData() { return (char*)this->startData() + fUsed; }
+    
+    static SkBufferBlock* Alloc(size_t length) {
+        size_t capacity = LengthToCapacity(length);
+        SkBufferBlock* block = (SkBufferBlock*)sk_malloc_throw(sizeof(SkBufferBlock) + capacity);
+        block->fNext = NULL;
+        block->fUsed = 0;
+        block->fCapacity = capacity;
+        return block;
+    }
+
+    // Return number of bytes actually appended
+    size_t append(const void* src, size_t length) {
+        this->validate();
+        size_t amount = SkTMin(this->avail(), length);
+        memcpy(this->availData(), src, amount);
+        fUsed += amount;
+        this->validate();
+        return amount;
+    }
+
+    void validate() const {
+#ifdef SK_DEBUG
+        SkASSERT(fCapacity > 0);
+        SkASSERT(fUsed <= fCapacity);
+#endif
+    }
+
+private:
+    static size_t LengthToCapacity(size_t length) {
+        const size_t minSize = kMinAllocSize - sizeof(SkBufferBlock);
+        return SkTMax(length, minSize);
+    }
+};
+
+struct SkBufferHead {
+    mutable int32_t fRefCnt;
+    SkBufferBlock   fBlock;
+
+    static size_t LengthToCapacity(size_t length) {
+        const size_t minSize = kMinAllocSize - sizeof(SkBufferHead);
+        return SkTMax(length, minSize);
+    }
+
+    static SkBufferHead* Alloc(size_t length) {
+        size_t capacity = LengthToCapacity(length);
+        size_t size = sizeof(SkBufferHead) + capacity;
+        SkBufferHead* head = (SkBufferHead*)sk_malloc_throw(size);
+        head->fRefCnt = 1;
+        head->fBlock.fNext = NULL;
+        head->fBlock.fUsed = 0;
+        head->fBlock.fCapacity = capacity;
+        return head;
+    }
+    
+    void ref() const {
+        SkASSERT(fRefCnt > 0);
+        sk_atomic_inc(&fRefCnt);
+    }
+    
+    void unref() const {
+        SkASSERT(fRefCnt > 0);
+        // A release here acts in place of all releases we "should" have been doing in ref().
+        if (1 == sk_atomic_fetch_add(&fRefCnt, -1, sk_memory_order_acq_rel)) {
+            // Like unique(), the acquire is only needed on success.
+            SkBufferBlock* block = fBlock.fNext;
+            sk_free((void*)this);
+            while (block) {
+                SkBufferBlock* next = block->fNext;
+                sk_free(block);
+                block = next;
+            }
+        }
+    }
+    
+    void validate(size_t minUsed, SkBufferBlock* tail = NULL) const {
+#ifdef SK_DEBUG
+        SkASSERT(fRefCnt > 0);
+        size_t totalUsed = 0;
+        const SkBufferBlock* block = &fBlock;
+        const SkBufferBlock* lastBlock = block;
+        while (block) {
+            block->validate();
+            totalUsed += block->fUsed;
+            lastBlock = block;
+            block = block->fNext;
+        }
+        SkASSERT(minUsed <= totalUsed);
+        if (tail) {
+            SkASSERT(tail == lastBlock);
+        }
+#endif
+    }
+};
+
+SkROBuffer::SkROBuffer(const SkBufferHead* head, size_t used) : fHead(head), fUsed(used) {
+    if (head) {
+        fHead->ref();
+        SkASSERT(used > 0);
+        head->validate(used);
+    } else {
+        SkASSERT(0 == used);
+    }
+}
+
+SkROBuffer::~SkROBuffer() {
+    if (fHead) {
+        fHead->validate(fUsed);
+        fHead->unref();
+    }
+}
+
+SkROBuffer::Iter::Iter(const SkROBuffer* buffer) {
+    this->reset(buffer);
+}
+
+void SkROBuffer::Iter::reset(const SkROBuffer* buffer) {
+    if (buffer) {
+        fBlock = &buffer->fHead->fBlock;
+        fRemaining = buffer->fUsed;
+    } else {
+        fBlock = NULL;
+        fRemaining = 0;
+    }
+}
+
+const void* SkROBuffer::Iter::data() const {
+    return fRemaining ? fBlock->startData() : NULL;
+}
+
+size_t SkROBuffer::Iter::size() const {
+    return SkTMin(fBlock->fUsed, fRemaining);
+}
+
+bool SkROBuffer::Iter::next() {
+    if (fRemaining) {
+        fRemaining -= this->size();
+        fBlock = fBlock->fNext;
+    }
+    return fRemaining != 0;
+}
+
+SkRWBuffer::SkRWBuffer(size_t initialCapacity) : fHead(NULL), fTail(NULL), fTotalUsed(0) {}
+
+SkRWBuffer::~SkRWBuffer() {
+    this->validate();
+    fHead->unref();
+}
+
+void SkRWBuffer::append(const void* src, size_t length) {
+    this->validate();
+    if (0 == length) {
+        return;
+    }
+
+    fTotalUsed += length;
+
+    if (NULL == fHead) {
+        fHead = SkBufferHead::Alloc(length);
+        fTail = &fHead->fBlock;
+    }
+
+    size_t written = fTail->append(src, length);
+    SkASSERT(written <= length);
+    src = (const char*)src + written;
+    length -= written;
+
+    if (length) {
+        SkBufferBlock* block = SkBufferBlock::Alloc(length);
+        fTail->fNext = block;
+        fTail = block;
+        written = fTail->append(src, length);
+        SkASSERT(written == length);
+    }
+    this->validate();
+}
+
+void* SkRWBuffer::append(size_t length) {
+    this->validate();
+    if (0 == length) {
+        return NULL;
+    }
+
+    fTotalUsed += length;
+    
+    if (NULL == fHead) {
+        fHead = SkBufferHead::Alloc(length);
+        fTail = &fHead->fBlock;
+    } else if (fTail->avail() < length) {
+        SkBufferBlock* block = SkBufferBlock::Alloc(length);
+        fTail->fNext = block;
+        fTail = block;
+    }
+
+    fTail->fUsed += length;
+    this->validate();
+    return (char*)fTail->availData() - length;
+}
+
+#ifdef SK_DEBUG
+void SkRWBuffer::validate() const {
+    if (fHead) {
+        fHead->validate(fTotalUsed, fTail);
+    } else {
+        SkASSERT(NULL == fTail);
+        SkASSERT(0 == fTotalUsed);
+    }
+}
+#endif
+
+SkROBuffer* SkRWBuffer::newRBufferSnapshot() const {
+    return SkNEW_ARGS(SkROBuffer, (fHead, fTotalUsed));
+}
+
+///////////////////////////////////////////////////////////////////////////////////////////////////
+
+class SkROBufferStreamAsset : public SkStreamAsset {
+    void validate() const {
+#ifdef SK_DEBUG
+        SkASSERT(fGlobalOffset <= fBuffer->size());
+        SkASSERT(fLocalOffset <= fIter.size());
+        SkASSERT(fLocalOffset <= fGlobalOffset);
+#endif
+    }
+
+#ifdef SK_DEBUG
+    class AutoValidate {
+        SkROBufferStreamAsset* fStream;
+    public:
+        AutoValidate(SkROBufferStreamAsset* stream) : fStream(stream) { stream->validate(); }
+        ~AutoValidate() { fStream->validate(); }
+    };
+    #define AUTO_VALIDATE   AutoValidate av(this);
+#else
+    #define AUTO_VALIDATE
+#endif
+
+public:
+    SkROBufferStreamAsset(const SkROBuffer* buffer) : fBuffer(SkRef(buffer)), fIter(buffer) {
+        fGlobalOffset = fLocalOffset = 0;
+    }
+
+    virtual ~SkROBufferStreamAsset() { fBuffer->unref(); }
+
+    size_t getLength() const override { return fBuffer->size(); }
+
+    bool rewind() override {
+        AUTO_VALIDATE
+        fIter.reset(fBuffer);
+        fGlobalOffset = fLocalOffset = 0;
+        return true;
+    }
+
+    size_t read(void* dst, size_t request) override {
+        AUTO_VALIDATE
+        size_t bytesRead = 0;
+        for (;;) {
+            size_t size = fIter.size();
+            SkASSERT(fLocalOffset <= size);
+            size_t avail = SkTMin(size - fLocalOffset, request - bytesRead);
+            if (dst) {
+                memcpy(dst, (const char*)fIter.data() + fLocalOffset, avail);
+                dst = (char*)dst + avail;
+            }
+            bytesRead += avail;
+            fLocalOffset += avail;
+            SkASSERT(bytesRead <= request);
+            if (bytesRead == request) {
+                break;
+            }
+            // If we get here, we've exhausted the current iter
+            SkASSERT(fLocalOffset == size);
+            fLocalOffset = 0;
+            if (!fIter.next()) {
+                break;   // ran out of data
+            }
+        }
+        fGlobalOffset += bytesRead;
+        SkASSERT(fGlobalOffset <= fBuffer->size());
+        return bytesRead;
+    }
+
+    bool isAtEnd() const override {
+        return fBuffer->size() == fGlobalOffset;
+    }
+    
+    SkStreamAsset* duplicate() const override {
+        return SkNEW_ARGS(SkROBufferStreamAsset, (fBuffer));
+    }
+    
+    size_t getPosition() const {
+        return fGlobalOffset;
+    }
+    
+    bool seek(size_t position) {
+        AUTO_VALIDATE
+        if (position < fGlobalOffset) {
+            this->rewind();
+        }
+        (void)this->skip(position - fGlobalOffset);
+        return true;
+    }
+    
+    bool move(long offset) {
+        AUTO_VALIDATE
+        offset += fGlobalOffset;
+        if (offset <= 0) {
+            this->rewind();
+        } else {
+            (void)this->seek(SkToSizeT(offset));
+        }
+        return true;
+    }
+    
+    SkStreamAsset* fork() const override {
+        SkStreamAsset* clone = this->duplicate();
+        clone->seek(this->getPosition());
+        return clone;
+    }
+    
+
+private:
+    const SkROBuffer*   fBuffer;
+    SkROBuffer::Iter    fIter;
+    size_t              fLocalOffset;
+    size_t              fGlobalOffset;
+};
+
+SkStreamAsset* SkRWBuffer::newStreamSnapshot() const {
+    SkAutoTUnref<SkROBuffer> buffer(this->newRBufferSnapshot());
+    return SkNEW_ARGS(SkROBufferStreamAsset, (buffer));
+}
diff --git a/src/core/SkRWBuffer.h b/src/core/SkRWBuffer.h
new file mode 100644 (file)
index 0000000..89cb425
--- /dev/null
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2015 Google Inc.
+ *
+ * Use of this source code is governed by a BSD-style license that can be
+ * found in the LICENSE file.
+ */
+
+#ifndef SkRWBuffer_DEFINED
+#define SkRWBuffer_DEFINED
+
+#include "SkRefCnt.h"
+
+struct SkBufferBlock;
+struct SkBufferHead;
+class SkRWBuffer;
+class SkStreamAsset;
+
+/**
+ *  Contains a read-only, thread-sharable block of memory. To access the memory, the caller must
+ *  instantiate a local iterator, as the memory is stored in 1 or more contiguous blocks.
+ */
+class SkROBuffer : public SkRefCnt {
+public:
+    /**
+     *  Return the logical length of the data owned/shared by this buffer. It may be stored in
+     *  multiple contiguous blocks, accessible via the iterator.
+     */
+    size_t size() const { return fUsed; }
+    
+    class Iter {
+    public:
+        Iter(const SkROBuffer*);
+
+        void reset(const SkROBuffer*);
+
+        /**
+         *  Return the current continuous block of memory, or NULL if the iterator is exhausted
+         */
+        const void* data() const;
+
+        /**
+         *  Returns the number of bytes in the current continguous block of memory, or 0 if the
+         *  iterator is exhausted.
+         */
+        size_t size() const;
+
+        /**
+         *  Advance to the next contiguous block of memory, returning true if there is another
+         *  block, or false if the iterator is exhausted.
+         */
+        bool next();
+        
+    private:
+        const SkBufferBlock* fBlock;
+        size_t               fRemaining;
+    };
+
+private:
+    SkROBuffer(const SkBufferHead* head, size_t used);
+    virtual ~SkROBuffer();
+    
+    const SkBufferHead* fHead;
+    const size_t        fUsed;
+    
+    friend class SkRWBuffer;
+};
+
+/**
+ *  Accumulates bytes of memory that are "appended" to it, growing internal storage as needed.
+ *  The growth is done such that at any time, a RBuffer or StreamAsset can be snapped off, which
+ *  can see the previously stored bytes, but which will be unaware of any future writes.
+ */
+class SkRWBuffer {
+public:
+    SkRWBuffer(size_t initialCapacity = 0);
+    ~SkRWBuffer();
+    
+    size_t size() const { return fTotalUsed; }
+    void append(const void* buffer, size_t length);
+    void* append(size_t length);
+
+    SkROBuffer* newRBufferSnapshot() const;
+    SkStreamAsset* newStreamSnapshot() const;
+    
+#ifdef SK_DEBUG
+    void validate() const;
+#else
+    void validate() const {}
+#endif
+    
+private:
+    SkBufferHead*   fHead;
+    SkBufferBlock*  fTail;
+    size_t          fTotalUsed;
+};
+
+#endif
index 099e909..981ac54 100644 (file)
@@ -232,3 +232,84 @@ DEF_TEST(Data, reporter) {
     test_cstring(reporter);
     test_files(reporter);
 }
+
+///////////////////////////////////////////////////////////////////////////////////////////////////
+#include "SkRWBuffer.h"
+
+const char gABC[] = "abcdefghijklmnopqrstuvwxyz";
+
+static void check_abcs(skiatest::Reporter* reporter, const char buffer[], size_t size) {
+    REPORTER_ASSERT(reporter, size % 26 == 0);
+    for (size_t offset = 0; offset < size; offset += 26) {
+        REPORTER_ASSERT(reporter, !memcmp(&buffer[offset], gABC, 26));
+    }
+}
+
+// stream should contains an integral number of copies of gABC.
+static void check_alphabet_stream(skiatest::Reporter* reporter, SkStream* stream) {
+    REPORTER_ASSERT(reporter, stream->hasLength());
+    size_t size = stream->getLength();
+    REPORTER_ASSERT(reporter, size % 26 == 0);
+
+    SkAutoTMalloc<char> storage(size);
+    char* array = storage.get();
+    size_t bytesRead = stream->read(array, size);
+    REPORTER_ASSERT(reporter, bytesRead == size);
+    check_abcs(reporter, array, size);
+
+    // try checking backwards
+    for (size_t offset = size; offset > 0; offset -= 26) {
+        REPORTER_ASSERT(reporter, stream->seek(offset - 26));
+        REPORTER_ASSERT(reporter, stream->getPosition() == offset - 26);
+        REPORTER_ASSERT(reporter, stream->read(array, 26) == 26);
+        check_abcs(reporter, array, 26);
+        REPORTER_ASSERT(reporter, stream->getPosition() == offset);
+    }
+}
+
+// reader should contains an integral number of copies of gABC.
+static void check_alphabet_buffer(skiatest::Reporter* reporter, const SkROBuffer* reader) {
+    size_t size = reader->size();
+    REPORTER_ASSERT(reporter, size % 26 == 0);
+    
+    SkAutoTMalloc<char> storage(size);
+    SkROBuffer::Iter iter(reader);
+    size_t offset = 0;
+    do {
+        SkASSERT(offset + iter.size() <= size);
+        memcpy(storage.get() + offset, iter.data(), iter.size());
+        offset += iter.size();
+    } while (iter.next());
+    REPORTER_ASSERT(reporter, offset == size);
+    check_abcs(reporter, storage.get(), size);
+}
+
+DEF_TEST(RWBuffer, reporter) {
+    // Knowing that the default capacity is 4096, choose N large enough so we force it to use
+    // multiple buffers internally.
+    const int N = 1000;
+    SkROBuffer* readers[N];
+    SkStream* streams[N];
+
+    {
+        SkRWBuffer buffer;
+        for (int i = 0; i < N; ++i) {
+            if (0 == (i & 1)) {
+                buffer.append(gABC, 26);
+            } else {
+                memcpy(buffer.append(26), gABC, 26);
+            }
+            readers[i] = buffer.newRBufferSnapshot();
+            streams[i] = buffer.newStreamSnapshot();
+        }
+        REPORTER_ASSERT(reporter, N*26 == buffer.size());
+    }
+
+    for (int i = 0; i < N; ++i) {
+        REPORTER_ASSERT(reporter, (i + 1) * 26U == readers[i]->size());
+        check_alphabet_buffer(reporter, readers[i]);
+        check_alphabet_stream(reporter, streams[i]);
+        readers[i]->unref();
+        SkDELETE(streams[i]);
+    }
+}