1 #ifndef _DEBLOCKBUFFER_HPP
2 #define _DEBLOCKBUFFER_HPP
3 /*-------------------------------------------------------------------------
4 * drawElements C++ Base Library
5 * -----------------------------
7 * Copyright 2014 The Android Open Source Project
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
23 * \brief Block-based thread-safe queue.
24 *//*--------------------------------------------------------------------*/
26 #include "deBlockBuffer.hpp"
27 #include "deMutex.hpp"
28 #include "deSemaphore.h"
35 void BlockBuffer_selfTest (void);
37 class BufferCanceledException : public std::exception
40 inline BufferCanceledException (void) {}
41 inline ~BufferCanceledException (void) throw() {}
43 const char* what (void) const throw() { return "BufferCanceledException"; }
50 typedef BufferCanceledException CanceledException;
52 BlockBuffer (int blockSize, int numBlocks);
55 void clear (void); //!< Resets buffer. Will block until pending writes and reads have completed.
57 void write (int numElements, const T* elements);
58 int tryWrite (int numElements, const T* elements);
62 void read (int numElements, T* elements);
63 int tryRead (int numElements, T* elements);
65 void cancel (void); //!< Sets buffer in canceled state. All (including pending) writes and reads will result in CanceledException.
66 bool isCanceled (void) const { return !!m_canceled; }
69 BlockBuffer (const BlockBuffer& other);
70 BlockBuffer& operator= (const BlockBuffer& other);
72 int writeToCurrentBlock (int numElements, const T* elements, bool blocking);
73 int readFromCurrentBlock(int numElements, T* elements, bool blocking);
75 void flushWriteBlock (void);
77 deSemaphore m_fill; //!< Block fill count.
78 deSemaphore m_empty; //!< Block empty count.
80 int m_writeBlock; //!< Current write block ndx.
81 int m_writePos; //!< Position in block. 0 if block is not yet acquired.
83 int m_readBlock; //!< Current read block ndx.
84 int m_readPos; //!< Position in block. 0 if block is not yet acquired.
90 int* m_numUsedInBlock;
95 volatile deUint32 m_canceled;
96 } DE_WARN_UNUSED_TYPE;
99 BlockBuffer<T>::BlockBuffer (int blockSize, int numBlocks)
106 , m_blockSize (blockSize)
107 , m_numBlocks (numBlocks)
108 , m_elements (DE_NULL)
109 , m_numUsedInBlock (DE_NULL)
112 , m_canceled (DE_FALSE)
114 DE_ASSERT(blockSize > 0);
115 DE_ASSERT(numBlocks > 0);
119 m_elements = new T[m_numBlocks*m_blockSize];
120 m_numUsedInBlock = new int[m_numBlocks];
125 delete[] m_numUsedInBlock;
129 m_fill = deSemaphore_create(0, DE_NULL);
130 m_empty = deSemaphore_create(numBlocks, DE_NULL);
131 DE_ASSERT(m_fill && m_empty);
134 template <typename T>
135 BlockBuffer<T>::~BlockBuffer (void)
138 delete[] m_numUsedInBlock;
140 deSemaphore_destroy(m_fill);
141 deSemaphore_destroy(m_empty);
144 template <typename T>
145 void BlockBuffer<T>::clear (void)
147 ScopedLock readLock (m_readLock);
148 ScopedLock writeLock (m_writeLock);
150 deSemaphore_destroy(m_fill);
151 deSemaphore_destroy(m_empty);
153 m_fill = deSemaphore_create(0, DE_NULL);
154 m_empty = deSemaphore_create(m_numBlocks, DE_NULL);
159 m_canceled = DE_FALSE;
161 DE_ASSERT(m_fill && m_empty);
164 template <typename T>
165 void BlockBuffer<T>::cancel (void)
167 DE_ASSERT(!m_canceled);
168 m_canceled = DE_TRUE;
170 deSemaphore_increment(m_empty);
171 deSemaphore_increment(m_fill);
174 template <typename T>
175 int BlockBuffer<T>::writeToCurrentBlock (int numElements, const T* elements, bool blocking)
177 DE_ASSERT(numElements > 0 && elements != DE_NULL);
181 /* Write thread doesn't own current block - need to acquire. */
183 deSemaphore_decrement(m_empty);
186 if (!deSemaphore_tryDecrement(m_empty))
190 /* Check for canceled bit. */
193 // \todo [2012-07-06 pyry] A bit hackish to assume that write lock is not freed if exception is thrown out here.
194 deSemaphore_increment(m_empty);
195 m_writeLock.unlock();
196 throw CanceledException();
200 /* Write thread owns current block. */
201 T* block = m_elements + m_writeBlock*m_blockSize;
202 int numToWrite = de::min(numElements, m_blockSize-m_writePos);
204 DE_ASSERT(numToWrite > 0);
206 for (int ndx = 0; ndx < numToWrite; ndx++)
207 block[m_writePos+ndx] = elements[ndx];
209 m_writePos += numToWrite;
211 if (m_writePos == m_blockSize)
212 flushWriteBlock(); /* Flush current write block. */
217 template <typename T>
218 int BlockBuffer<T>::readFromCurrentBlock (int numElements, T* elements, bool blocking)
220 DE_ASSERT(numElements > 0 && elements != DE_NULL);
224 /* Read thread doesn't own current block - need to acquire. */
226 deSemaphore_decrement(m_fill);
229 if (!deSemaphore_tryDecrement(m_fill))
233 /* Check for canceled bit. */
236 // \todo [2012-07-06 pyry] A bit hackish to assume that read lock is not freed if exception is thrown out here.
237 deSemaphore_increment(m_fill);
239 throw CanceledException();
243 /* Read thread now owns current block. */
244 const T* block = m_elements + m_readBlock*m_blockSize;
245 int numUsedInBlock = m_numUsedInBlock[m_readBlock];
246 int numToRead = de::min(numElements, numUsedInBlock-m_readPos);
248 DE_ASSERT(numToRead > 0);
250 for (int ndx = 0; ndx < numToRead; ndx++)
251 elements[ndx] = block[m_readPos+ndx];
253 m_readPos += numToRead;
255 if (m_readPos == numUsedInBlock)
257 /* Free current read block and advance. */
258 m_readBlock = (m_readBlock+1) % m_numBlocks;
260 deSemaphore_increment(m_empty);
266 template <typename T>
267 int BlockBuffer<T>::tryWrite (int numElements, const T* elements)
271 DE_ASSERT(numElements > 0 && elements != DE_NULL);
274 throw CanceledException();
276 if (!m_writeLock.tryLock())
279 while (numWritten < numElements)
281 int ret = writeToCurrentBlock(numElements-numWritten, elements+numWritten, false /* non-blocking */);
284 break; /* Write failed. */
289 m_writeLock.unlock();
294 template <typename T>
295 void BlockBuffer<T>::write (int numElements, const T* elements)
297 DE_ASSERT(numElements > 0 && elements != DE_NULL);
300 throw CanceledException();
305 while (numWritten < numElements)
306 numWritten += writeToCurrentBlock(numElements-numWritten, elements+numWritten, true /* blocking */);
308 m_writeLock.unlock();
311 template <typename T>
312 void BlockBuffer<T>::flush (void)
319 m_writeLock.unlock();
322 template <typename T>
323 bool BlockBuffer<T>::tryFlush (void)
325 if (!m_writeLock.tryLock())
331 m_writeLock.unlock();
336 template <typename T>
337 void BlockBuffer<T>::flushWriteBlock (void)
339 DE_ASSERT(de::inRange(m_writePos, 1, m_blockSize));
341 m_numUsedInBlock[m_writeBlock] = m_writePos;
342 m_writeBlock = (m_writeBlock+1) % m_numBlocks;
344 deSemaphore_increment(m_fill);
347 template <typename T>
348 int BlockBuffer<T>::tryRead (int numElements, T* elements)
353 throw CanceledException();
355 if (!m_readLock.tryLock())
358 while (numRead < numElements)
360 int ret = readFromCurrentBlock(numElements-numRead, &elements[numRead], false /* non-blocking */);
373 template <typename T>
374 void BlockBuffer<T>::read (int numElements, T* elements)
376 DE_ASSERT(numElements > 0 && elements != DE_NULL);
379 throw CanceledException();
384 while (numRead < numElements)
385 numRead += readFromCurrentBlock(numElements-numRead, &elements[numRead], true /* blocking */);
392 #endif // _DEBLOCKBUFFER_HPP