1 /*-------------------------------------------------------------------------
2 * drawElements C++ Base Library
3 * -----------------------------
5 * Copyright 2014 The Android Open Source Project
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 * \brief Block-based thread-safe queue.
22 *//*--------------------------------------------------------------------*/
24 #include "deBlockBuffer.hpp"
25 #include "deRandom.hpp"
26 #include "deThread.hpp"
36 namespace BlockBufferBasicTest
43 Message (deUint16 threadId, deUint16 payload)
44 : data((threadId << 16) | payload)
53 deUint16 getThreadId (void) const { return (deUint16)(data >> 16); }
54 deUint16 getPayload (void) const { return (deUint16)(data & 0xffff); }
57 typedef BlockBuffer<Message> MessageBuffer;
59 class Consumer : public Thread
62 Consumer (MessageBuffer& buffer, int numProducers)
65 m_lastPayload.resize(numProducers, 0);
66 m_payloadSum.resize(numProducers, 0);
71 Random rnd ((deUint32)m_lastPayload.size());
77 int numToRead = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmpBuf));
78 int numRead = m_buffer.tryRead(numToRead, &tmpBuf[0]);
80 for (int ndx = 0; ndx < numRead; ndx++)
82 const Message& msg = tmpBuf[ndx];
84 deUint16 threadId = msg.getThreadId();
86 if (threadId == 0xffff)
88 /* Feed back rest of messages to buffer (they are end messages) so other consumers wake up. */
91 m_buffer.write(numRead-ndx-1, &tmpBuf[ndx+1]);
100 /* Verify message. */
101 DE_TEST_ASSERT(de::inBounds<int>(threadId, 0, (int)m_lastPayload.size()));
102 DE_TEST_ASSERT((m_lastPayload[threadId] == 0 && msg.getPayload() == 0) || m_lastPayload[threadId] < msg.getPayload());
104 m_lastPayload[threadId] = msg.getPayload();
105 m_payloadSum[threadId] += (deUint32)msg.getPayload();
111 deUint32 getPayloadSum (deUint16 threadId) const
113 return m_payloadSum[threadId];
117 MessageBuffer& m_buffer;
118 vector<deUint16> m_lastPayload;
119 vector<deUint32> m_payloadSum;
122 class Producer : public Thread
125 Producer (MessageBuffer& buffer, deUint16 threadId, int numMessages)
127 , m_threadId (threadId)
128 , m_numMessages (numMessages)
134 // Yield to give main thread chance to start other producers.
137 Random rnd (m_threadId);
141 while (msgNdx < m_numMessages)
143 int writeSize = rnd.getInt(1, de::min(m_numMessages-msgNdx, DE_LENGTH_OF_ARRAY(tmpBuf)));
144 for (int ndx = 0; ndx < writeSize; ndx++)
145 tmpBuf[ndx] = Message(m_threadId, (deUint16)msgNdx++);
147 m_buffer.write(writeSize, &tmpBuf[0]);
154 MessageBuffer& m_buffer;
161 const int numIterations = 8;
162 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
164 Random rnd (iterNdx);
165 int numBlocks = rnd.getInt(2, 128);
166 int blockSize = rnd.getInt(1, 16);
167 int numProducers = rnd.getInt(1, 16);
168 int numConsumers = rnd.getInt(1, 16);
169 int dataSize = rnd.getInt(50, 200);
170 MessageBuffer buffer (blockSize, numBlocks);
171 vector<Producer*> producers;
172 vector<Consumer*> consumers;
174 for (int i = 0; i < numProducers; i++)
175 producers.push_back(new Producer(buffer, (deUint16)i, dataSize));
177 for (int i = 0; i < numConsumers; i++)
178 consumers.push_back(new Consumer(buffer, numProducers));
181 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
185 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
188 // Wait for producers.
189 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
192 // Write end messages for consumers.
193 const Message endMsg(0xffff, 0);
194 for (int i = 0; i < numConsumers; i++)
195 buffer.write(1, &endMsg);
198 // Wait for consumers.
199 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
202 // Verify payload sums.
204 for (int i = 0; i < dataSize; i++)
205 refSum += (deUint32)(deUint16)i;
207 for (int i = 0; i < numProducers; i++)
210 for (int j = 0; j < numConsumers; j++)
211 cmpSum += consumers[j]->getPayloadSum((deUint16)i);
212 DE_TEST_ASSERT(refSum == cmpSum);
216 for (vector<Producer*>::iterator i = producers.begin(); i != producers.end(); i++)
218 for (vector<Consumer*>::iterator i = consumers.begin(); i != consumers.end(); i++)
223 } // BlockBufferBasicTest
225 namespace BlockBufferCancelTest
228 class Producer : public Thread
231 Producer (BlockBuffer<deUint8>* buffer, deUint32 seed)
244 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
248 m_buffer->write(blockSize, &tmp[0]);
253 catch (const BlockBuffer<deUint8>::CanceledException&)
261 BlockBuffer<deUint8>* m_buffer;
265 class Consumer : public Thread
268 Consumer (BlockBuffer<deUint8>* buffer, deUint32 seed)
281 int blockSize = rnd.getInt(1, DE_LENGTH_OF_ARRAY(tmp));
285 m_buffer->read(blockSize, &tmp[0]);
287 catch (const BlockBuffer<deUint8>::CanceledException&)
295 BlockBuffer<deUint8>* m_buffer;
301 BlockBuffer<deUint8> buffer (64, 16);
302 const int numIterations = 8;
304 for (int iterNdx = 0; iterNdx < numIterations; iterNdx++)
306 Random rnd (deInt32Hash(iterNdx));
307 int numThreads = rnd.getInt(1, 16);
308 int sleepMs = rnd.getInt(1, 200);
309 vector<Thread*> threads;
311 for (int i = 0; i < numThreads; i++)
314 threads.push_back(new Consumer(&buffer, rnd.getUint32()));
316 threads.push_back(new Producer(&buffer, rnd.getUint32()));
320 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
323 // Sleep for a while.
329 // Wait for threads to finish.
330 for (vector<Thread*>::iterator i = threads.begin(); i != threads.end(); i++)
337 for (vector<Thread*>::iterator thread = threads.begin(); thread != threads.end(); ++thread)
342 } // BlockBufferCancelTest
344 void BlockBuffer_selfTest (void)
346 BlockBufferBasicTest::runTest();
347 BlockBufferCancelTest::runTest();