Increase no-data timeout to 5 seconds. am: d00b71cbc3 am: f8083c0977 am: 5e52d6630c...
[platform/upstream/VK-GL-CTS.git] / framework / delibs / destream / deRingbuffer.c
1 /*-------------------------------------------------------------------------
2  * drawElements Stream Library
3  * ---------------------------
4  *
5  * Copyright 2014 The Android Open Source Project
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  *
19  *//*!
20  * \file
21  * \brief Thread safe ringbuffer
22  *//*--------------------------------------------------------------------*/
23 #include "deRingbuffer.h"
24
25 #include "deInt32.h"
26 #include "deMemory.h"
27 #include "deSemaphore.h"
28
29 #include <stdlib.h>
30 #include <stdio.h>
31
32 struct deRingbuffer_s
33 {
34         deInt32                 blockSize;
35         deInt32                 blockCount;
36         deInt32*                blockUsage;
37         deUint8*                buffer;
38
39         deSemaphore             emptyCount;
40         deSemaphore             fullCount;
41
42         deInt32                 outBlock;
43         deInt32                 outPos;
44
45         deInt32                 inBlock;
46         deInt32                 inPos;
47
48         deBool                  stopNotified;
49         deBool                  consumerStopping;
50 };
51
52 deRingbuffer* deRingbuffer_create (deInt32 blockSize, deInt32 blockCount)
53 {
54         deRingbuffer* ringbuffer = (deRingbuffer*)deCalloc(sizeof(deRingbuffer));
55
56         DE_ASSERT(ringbuffer);
57         DE_ASSERT(blockCount > 0);
58         DE_ASSERT(blockSize > 0);
59
60         ringbuffer->blockSize   = blockSize;
61         ringbuffer->blockCount  = blockCount;
62         ringbuffer->buffer              = (deUint8*)deMalloc(sizeof(deUint8) * (size_t)blockSize * (size_t)blockCount);
63         ringbuffer->blockUsage  = (deInt32*)deMalloc(sizeof(deUint32) * (size_t)blockCount);
64         ringbuffer->emptyCount  = deSemaphore_create(ringbuffer->blockCount, DE_NULL);
65         ringbuffer->fullCount   = deSemaphore_create(0, DE_NULL);
66
67         if (!ringbuffer->buffer         ||
68                 !ringbuffer->blockUsage ||
69                 !ringbuffer->emptyCount ||
70                 !ringbuffer->fullCount)
71         {
72                 if (ringbuffer->emptyCount)
73                         deSemaphore_destroy(ringbuffer->emptyCount);
74                 if (ringbuffer->fullCount)
75                         deSemaphore_destroy(ringbuffer->fullCount);
76                 deFree(ringbuffer->buffer);
77                 deFree(ringbuffer->blockUsage);
78                 deFree(ringbuffer);
79                 return DE_NULL;
80         }
81
82         memset(ringbuffer->blockUsage, 0, sizeof(deInt32) * (size_t)blockCount);
83
84         ringbuffer->outBlock    = 0;
85         ringbuffer->outPos              = 0;
86
87         ringbuffer->inBlock             = 0;
88         ringbuffer->inPos               = 0;
89
90         ringbuffer->stopNotified                = DE_FALSE;
91         ringbuffer->consumerStopping    = DE_FALSE;
92
93         return ringbuffer;
94 }
95
96 void deRingbuffer_stop (deRingbuffer* ringbuffer)
97 {
98         /* Set notify to true and increment fullCount to let consumer continue */
99         ringbuffer->stopNotified = DE_TRUE;
100         deSemaphore_increment(ringbuffer->fullCount);
101 }
102
103 void deRingbuffer_destroy (deRingbuffer* ringbuffer)
104 {
105         deSemaphore_destroy(ringbuffer->emptyCount);
106         deSemaphore_destroy(ringbuffer->fullCount);
107
108         free(ringbuffer->buffer);
109         free(ringbuffer->blockUsage);
110         free(ringbuffer);
111 }
112
113 static deStreamResult producerStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* written)
114 {
115         deRingbuffer* ringbuffer = (deRingbuffer*)stream;
116
117         DE_ASSERT(stream);
118         /* If ringbuffer is stopping return error on write */
119         if (ringbuffer->stopNotified)
120         {
121                 DE_ASSERT(DE_FALSE);
122                 return DE_STREAMRESULT_ERROR;
123         }
124
125         *written = 0;
126
127         /* Write while more data available */
128         while (*written < bufSize)
129         {
130                 deInt32         writeSize       = 0;
131                 deUint8*        src                     = DE_NULL;
132                 deUint8*        dst                     = DE_NULL;
133
134                 /* If between blocks accuire new block */
135                 if (ringbuffer->inPos == 0)
136                 {
137                         deSemaphore_decrement(ringbuffer->emptyCount);
138                 }
139
140                 writeSize       = deMin32(ringbuffer->blockSize - ringbuffer->inPos, bufSize - *written);
141                 dst                     = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->inBlock + ringbuffer->inPos;
142                 src                     = (deUint8*)buf + *written;
143
144                 deMemcpy(dst, src, (size_t)writeSize);
145
146                 ringbuffer->inPos += writeSize;
147                 *written += writeSize;
148                 ringbuffer->blockUsage[ringbuffer->inBlock] += writeSize;
149
150                 /* Block is full move to next one (or "between" this and next block) */
151                 if (ringbuffer->inPos == ringbuffer->blockSize)
152                 {
153                         ringbuffer->inPos = 0;
154                         ringbuffer->inBlock++;
155
156                         if (ringbuffer->inBlock == ringbuffer->blockCount)
157                                 ringbuffer->inBlock = 0;
158                         deSemaphore_increment(ringbuffer->fullCount);
159                 }
160         }
161
162         return DE_STREAMRESULT_SUCCESS;
163 }
164
165 static deStreamResult producerStream_flush (deStreamData* stream)
166 {
167         deRingbuffer* ringbuffer = (deRingbuffer*)stream;
168
169         DE_ASSERT(stream);
170
171         /* No blocks reserved by producer */
172         if (ringbuffer->inPos == 0)
173                 return DE_STREAMRESULT_SUCCESS;
174
175         ringbuffer->inPos               = 0;
176         ringbuffer->inBlock++;
177
178         if (ringbuffer->inBlock == ringbuffer->blockCount)
179                 ringbuffer->inBlock = 0;
180
181         deSemaphore_increment(ringbuffer->fullCount);
182         return DE_STREAMRESULT_SUCCESS;
183 }
184
185 static deStreamResult producerStream_deinit (deStreamData* stream)
186 {
187         DE_ASSERT(stream);
188
189         producerStream_flush(stream);
190
191         /* \note mika Stream doesn't own ringbuffer, so it's not deallocated */
192         return DE_STREAMRESULT_SUCCESS;
193 }
194
195 static deStreamResult consumerStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* read)
196 {
197         deRingbuffer* ringbuffer = (deRingbuffer*)stream;
198
199         DE_ASSERT(stream);
200
201         *read = 0;
202         DE_ASSERT(ringbuffer);
203
204         while (*read < bufSize)
205         {
206                 deInt32         writeSize       = 0;
207                 deUint8*        src                     = DE_NULL;
208                 deUint8*        dst                     = DE_NULL;
209
210                 /* If between blocks accuire new block */
211                 if (ringbuffer->outPos == 0)
212                 {
213                         /* If consumer is set to stop after everything is consumed,
214                          * do not block if there is no more input left
215                          */
216                         if (ringbuffer->consumerStopping)
217                         {
218                                 /* Try to accuire new block, if can't there is no more input */
219                                 if (!deSemaphore_tryDecrement(ringbuffer->fullCount))
220                                 {
221                                         return DE_STREAMRESULT_END_OF_STREAM;
222                                 }
223                         }
224                         else
225                         {
226                                 /* If not stopping block until there is more input */
227                                 deSemaphore_decrement(ringbuffer->fullCount);
228                                 /* Ringbuffer was set to stop */
229                                 if (ringbuffer->stopNotified)
230                                 {
231                                         ringbuffer->consumerStopping = DE_TRUE;
232                                 }
233                         }
234
235                 }
236
237                 writeSize       = deMin32(ringbuffer->blockUsage[ringbuffer->outBlock] - ringbuffer->outPos, bufSize - *read);
238                 src                     = ringbuffer->buffer + ringbuffer->blockSize * ringbuffer->outBlock + ringbuffer->outPos;
239                 dst                     = (deUint8*)buf + *read;
240
241                 deMemcpy(dst, src, (size_t)writeSize);
242
243                 ringbuffer->outPos += writeSize;
244                 *read += writeSize;
245
246                 /* Block is consumed move to next one (or "between" this and next block) */
247                 if (ringbuffer->outPos == ringbuffer->blockUsage[ringbuffer->outBlock])
248                 {
249                         ringbuffer->blockUsage[ringbuffer->outBlock] = 0;
250                         ringbuffer->outPos = 0;
251                         ringbuffer->outBlock++;
252
253                         if (ringbuffer->outBlock == ringbuffer->blockCount)
254                                 ringbuffer->outBlock = 0;
255
256                         deSemaphore_increment(ringbuffer->emptyCount);
257                 }
258         }
259
260         return DE_STREAMRESULT_SUCCESS;
261 }
262
263
264 static deStreamResult consumerStream_deinit (deStreamData* stream)
265 {
266         DE_ASSERT(stream);
267         DE_UNREF(stream);
268
269         return DE_STREAMRESULT_SUCCESS;
270 }
271
272 /* There are no sensible errors so status is always good */
273 deStreamStatus dummy_getStatus (deStreamData* stream)
274 {
275         DE_UNREF(stream);
276
277         return DE_STREAMSTATUS_GOOD;
278 }
279
280 /* There are no sensible errors in ringbuffer */
281 static const char* dummy_getError (deStreamData* stream)
282 {
283         DE_ASSERT(stream);
284         DE_UNREF(stream);
285         return DE_NULL;
286 }
287
288 static const deIOStreamVFTable producerStreamVFTable = {
289         DE_NULL,
290         producerStream_write,
291         dummy_getError,
292         producerStream_flush,
293         producerStream_deinit,
294         dummy_getStatus
295 };
296
297 static const deIOStreamVFTable consumerStreamVFTable = {
298         consumerStream_read,
299         DE_NULL,
300         dummy_getError,
301         DE_NULL,
302         consumerStream_deinit,
303         dummy_getStatus
304 };
305
306 void deProducerStream_init (deOutStream* stream, deRingbuffer* buffer)
307 {
308         stream->ioStream.streamData = (deStreamData*)buffer;
309         stream->ioStream.vfTable = &producerStreamVFTable;
310 }
311
312 void deConsumerStream_init (deInStream* stream, deRingbuffer* buffer)
313 {
314         stream->ioStream.streamData = (deStreamData*)buffer;
315         stream->ioStream.vfTable = &consumerStreamVFTable;
316 }