1 /*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
5 * Copyright 2014 The Android Open Source Project
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 * \brief Thread safe ringbuffer
22 *//*--------------------------------------------------------------------*/
23 #include "deRingbuffer.h"
27 #include "deSemaphore.h"
39 deSemaphore emptyCount;
40 deSemaphore fullCount;
49 deBool consumerStopping;
52 deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
54 deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
56 DE_ASSERT(ringbuffer);
57 DE_ASSERT(blockCount > 0);
58 DE_ASSERT(blockSize > 0);
60 ringbuffer->blockSize = blockSize;
61 ringbuffer->blockCount = blockCount;
62 ringbuffer->buffer = (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount);
63 ringbuffer->blockUsage = (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount);
64 ringbuffer->emptyCount = deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65 ringbuffer->fullCount = deSemaphore_create(0, DE_NULL);
67 if (!ringbuffer->buffer ||
68 !ringbuffer->blockUsage ||
69 !ringbuffer->emptyCount ||
70 !ringbuffer->fullCount)
72 if (ringbuffer->emptyCount)
73 deSemaphore_destroy(ringbuffer->emptyCount);
74 if (ringbuffer->fullCount)
75 deSemaphore_destroy(ringbuffer->fullCount);
76 deFree(ringbuffer->buffer);
77 deFree(ringbuffer->blockUsage);
82 memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount);
84 ringbuffer->outBlock = 0;
85 ringbuffer->outPos = 0;
87 ringbuffer->inBlock = 0;
88 ringbuffer->inPos = 0;
90 ringbuffer->stopNotified = DE_FALSE;
91 ringbuffer->consumerStopping = DE_FALSE;
96 void deRingbuffer_stop (deRingbuffer* ringbuffer)
98 /* Set notify to true and increment fullCount to let consumer continue */
99 ringbuffer->stopNotified = DE_TRUE;
100 deSemaphore_increment(ringbuffer->fullCount);
103 void deRingbuffer_destroy (deRingbuffer* ringbuffer)
105 deSemaphore_destroy(ringbuffer->emptyCount);
106 deSemaphore_destroy(ringbuffer->fullCount);
108 free(ringbuffer->buffer);
109 free(ringbuffer->blockUsage);
113 static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
115 deRingbuffer* ringbuffer = (deRingbuffer*)stream;
118 /* If ringbuffer is stopping return error on write */
119 if (ringbuffer->stopNotified)
122 return DE_STREAMRESULT_ERROR;
127 /* Write while more data available */
128 while (*written < bufSize)
130 deInt32 writeSize = 0;
131 deUint8* src = DE_NULL;
132 deUint8* dst = DE_NULL;
134 /* If between blocks accuire new block */
135 if (ringbuffer->inPos == 0)
137 deSemaphore_decrement(ringbuffer->emptyCount);
140 writeSize = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
141 dst = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
142 src = (deUint8*)buf + *written;
144 deMemcpy(dst, src, (size_t)writeSize);
146 ringbuffer->inPos += writeSize;
147 *written += writeSize;
148 ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
150 /* Block is full move to next one (or "between" this and next block) */
151 if (ringbuffer->inPos == ringbuffer->blockSize)
153 ringbuffer->inPos = 0;
154 ringbuffer->inBlock++;
156 if (ringbuffer->inBlock == ringbuffer->blockCount)
157 ringbuffer->inBlock = 0;
158 deSemaphore_increment(ringbuffer->fullCount);
162 return DE_STREAMRESULT_SUCCESS;
165 static deStreamResult producerStream_flush (deStreamData* stream)
167 deRingbuffer* ringbuffer = (deRingbuffer*)stream;
171 /* No blocks reserved by producer */
172 if (ringbuffer->inPos == 0)
173 return DE_STREAMRESULT_SUCCESS;
175 ringbuffer->inPos = 0;
176 ringbuffer->inBlock++;
178 if (ringbuffer->inBlock == ringbuffer->blockCount)
179 ringbuffer->inBlock = 0;
181 deSemaphore_increment(ringbuffer->fullCount);
182 return DE_STREAMRESULT_SUCCESS;
185 static deStreamResult producerStream_deinit (deStreamData* stream)
189 producerStream_flush(stream);
191 /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
192 return DE_STREAMRESULT_SUCCESS;
195 static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
197 deRingbuffer* ringbuffer = (deRingbuffer*)stream;
202 DE_ASSERT(ringbuffer);
204 while (*read < bufSize)
206 deInt32 writeSize = 0;
207 deUint8* src = DE_NULL;
208 deUint8* dst = DE_NULL;
210 /* If between blocks accuire new block */
211 if (ringbuffer->outPos == 0)
213 /* If consumer is set to stop after everything is consumed,
214 * do not block if there is no more input left
216 if (ringbuffer->consumerStopping)
218 /* Try to accuire new block, if can't there is no more input */
219 if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
221 return DE_STREAMRESULT_END_OF_STREAM;
226 /* If not stopping block until there is more input */
227 deSemaphore_decrement(ringbuffer->fullCount);
228 /* Ringbuffer was set to stop */
229 if (ringbuffer->stopNotified)
231 ringbuffer->consumerStopping = DE_TRUE;
237 writeSize = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
238 src = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
239 dst = (deUint8*)buf + *read;
241 deMemcpy(dst, src, (size_t)writeSize);
243 ringbuffer->outPos += writeSize;
246 /* Block is consumed move to next one (or "between" this and next block) */
247 if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
249 ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
250 ringbuffer->outPos = 0;
251 ringbuffer->outBlock++;
253 if (ringbuffer->outBlock == ringbuffer->blockCount)
254 ringbuffer->outBlock = 0;
256 deSemaphore_increment(ringbuffer->emptyCount);
260 return DE_STREAMRESULT_SUCCESS;
264 static deStreamResult consumerStream_deinit (deStreamData* stream)
269 return DE_STREAMRESULT_SUCCESS;
272 /* There are no sensible errors so status is always good */
273 deStreamStatus dummy_getStatus (deStreamData* stream)
277 return DE_STREAMSTATUS_GOOD;
280 /* There are no sensible errors in ringbuffer */
281 static const char* dummy_getError (deStreamData* stream)
288 static const deIOStreamVFTable producerStreamVFTable = {
290 producerStream_write,
292 producerStream_flush,
293 producerStream_deinit,
297 static const deIOStreamVFTable consumerStreamVFTable = {
302 consumerStream_deinit,
306 void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
308 stream->ioStream.streamData = (deStreamData*)buffer;
309 stream->ioStream.vfTable = &producerStreamVFTable;
312 void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
314 stream->ioStream.streamData = (deStreamData*)buffer;
315 stream->ioStream.vfTable = &consumerStreamVFTable;