1 /*-------------------------------------------------------------------------
2 * drawElements Stream Library
3 * ---------------------------
5 * Copyright 2014 The Android Open Source Project
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
21 * \brief Buffered and threaded input and output streams
22 *//*--------------------------------------------------------------------*/
24 #include "deThreadStream.h"
25 #include "deStreamCpyThread.h"
26 #include "deRingbuffer.h"
29 typedef struct deThreadInStream_s
31 deRingbuffer* ringbuffer;
33 deInStream consumerStream;
34 deOutStream producerStream;
39 typedef struct deThreadOutStream_s
41 deRingbuffer* ringbuffer;
42 deInStream consumerStream;
43 deOutStream producerStream;
44 deStreamCpyThread* thread;
47 static void inStreamCopy (void* arg)
49 deThreadInStream* threadStream = (deThreadInStream*)arg;
51 deUint8* buffer = malloc(sizeof(deUint8) * (size_t)threadStream->bufferSize);
57 deStreamResult readResult = DE_STREAMRESULT_ERROR;
59 readResult = deInStream_read(threadStream->input, buffer, threadStream->bufferSize, &read);
60 DE_ASSERT(readResult != DE_STREAMRESULT_ERROR);
61 while (written < read)
65 /* \todo [mika] Handle errors */
66 deOutStream_write(&(threadStream->producerStream), buffer, read - written, &wrote);
71 if (readResult == DE_STREAMRESULT_END_OF_STREAM)
77 deOutStream_flush(&(threadStream->producerStream));
78 deRingbuffer_stop(threadStream->ringbuffer);
83 static deStreamResult threadInStream_read (deStreamData* stream, void* buf, deInt32 bufSize, deInt32* numRead)
85 deThreadInStream* threadStream = (deThreadInStream*)stream;
86 return deInStream_read(&(threadStream->consumerStream), buf, bufSize, numRead);
89 static const char* threadInStream_getError (deStreamData* stream)
91 deThreadInStream* threadStream = (deThreadInStream*)stream;
93 /* \todo [mika] Add handling for errors on thread stream */
94 return deInStream_getError(&(threadStream->consumerStream));
97 static deStreamStatus threadInStream_getStatus (deStreamData* stream)
99 deThreadInStream* threadStream = (deThreadInStream*)stream;
101 /* \todo [mika] Add handling for status on thread stream */
102 return deInStream_getStatus(&(threadStream->consumerStream));
105 /* \note [mika] Used by both in and out stream */
106 static deStreamResult threadStream_deinit (deStreamData* stream)
108 deThreadInStream* threadStream = (deThreadInStream*)stream;
110 deRingbuffer_stop(threadStream->ringbuffer);
112 deThread_join(threadStream->thread);
113 deThread_destroy(threadStream->thread);
115 deOutStream_deinit(&(threadStream->producerStream));
116 deInStream_deinit(&(threadStream->consumerStream));
118 deRingbuffer_destroy(threadStream->ringbuffer);
120 return DE_STREAMRESULT_SUCCESS;
123 static const deIOStreamVFTable threadInStreamVFTable = {
126 threadInStream_getError,
129 threadInStream_getStatus
132 void deThreadInStream_init (deInStream* stream, deInStream* input, int ringbufferBlockSize, int ringbufferBlockCount)
134 deThreadInStream* threadStream = DE_NULL;
136 threadStream = malloc(sizeof(deThreadInStream));
137 DE_ASSERT(threadStream);
139 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
140 DE_ASSERT(threadStream->ringbuffer);
142 threadStream->bufferSize = ringbufferBlockSize;
143 threadStream->input = input;
144 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
145 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
147 threadStream->thread = deThread_create(inStreamCopy, threadStream, DE_NULL);
148 stream->ioStream.vfTable = &threadInStreamVFTable;
149 stream->ioStream.streamData = threadStream;
152 static deStreamResult threadOutStream_write (deStreamData* stream, const void* buf, deInt32 bufSize, deInt32* numWritten)
154 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
155 return deOutStream_write(&(threadStream->producerStream), buf, bufSize, numWritten);
158 static const char* threadOutStream_getError (deStreamData* stream)
160 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
162 /* \todo [mika] Add handling for errors on thread stream */
163 return deOutStream_getError(&(threadStream->producerStream));
166 static deStreamStatus threadOutStream_getStatus (deStreamData* stream)
168 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
170 /* \todo [mika] Add handling for errors on thread stream */
171 return deOutStream_getStatus(&(threadStream->producerStream));
174 static deStreamResult threadOutStream_flush (deStreamData* stream)
176 deThreadOutStream* threadStream = (deThreadOutStream*)stream;
178 return deOutStream_flush(&(threadStream->producerStream));
181 static const deIOStreamVFTable threadOutStreamVFTable = {
183 threadOutStream_write,
184 threadOutStream_getError,
185 threadOutStream_flush,
187 threadOutStream_getStatus
190 void deThreadOutStream_init (deOutStream* stream, deOutStream* output, int ringbufferBlockSize, int ringbufferBlockCount)
192 deThreadOutStream* threadStream = DE_NULL;
194 threadStream = malloc(sizeof(deThreadOutStream));
195 DE_ASSERT(threadStream);
197 threadStream->ringbuffer = deRingbuffer_create(ringbufferBlockSize, ringbufferBlockCount);
198 DE_ASSERT(threadStream->ringbuffer);
200 deProducerStream_init(&(threadStream->producerStream), threadStream->ringbuffer);
201 deConsumerStream_init(&(threadStream->consumerStream), threadStream->ringbuffer);
203 threadStream->thread = deStreamCpyThread_create(&(threadStream->consumerStream), output, ringbufferBlockSize);
204 stream->ioStream.vfTable = &threadOutStreamVFTable;
205 stream->ioStream.streamData = threadStream;