Set representative license: LGPL-2.1
[platform/upstream/7zip.git] / C / MtCoder.c
1 /* MtCoder.c -- Multi-thread Coder\r
2 2010-09-24 : Igor Pavlov : Public domain */\r
3 \r
4 #include <stdio.h>\r
5 \r
6 #include "MtCoder.h"\r
7 \r
8 void LoopThread_Construct(CLoopThread *p)\r
9 {\r
10   Thread_Construct(&p->thread);\r
11   Event_Construct(&p->startEvent);\r
12   Event_Construct(&p->finishedEvent);\r
13 }\r
14 \r
15 void LoopThread_Close(CLoopThread *p)\r
16 {\r
17   Thread_Close(&p->thread);\r
18   Event_Close(&p->startEvent);\r
19   Event_Close(&p->finishedEvent);\r
20 }\r
21 \r
22 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE LoopThreadFunc(void *pp)\r
23 {\r
24   CLoopThread *p = (CLoopThread *)pp;\r
25   for (;;)\r
26   {\r
27     if (Event_Wait(&p->startEvent) != 0)\r
28       return SZ_ERROR_THREAD;\r
29     if (p->stop)\r
30       return 0;\r
31     p->res = p->func(p->param);\r
32     if (Event_Set(&p->finishedEvent) != 0)\r
33       return SZ_ERROR_THREAD;\r
34   }\r
35 }\r
36 \r
37 WRes LoopThread_Create(CLoopThread *p)\r
38 {\r
39   p->stop = 0;\r
40   RINOK(AutoResetEvent_CreateNotSignaled(&p->startEvent));\r
41   RINOK(AutoResetEvent_CreateNotSignaled(&p->finishedEvent));\r
42   return Thread_Create(&p->thread, LoopThreadFunc, p);\r
43 }\r
44 \r
45 WRes LoopThread_StopAndWait(CLoopThread *p)\r
46 {\r
47   p->stop = 1;\r
48   if (Event_Set(&p->startEvent) != 0)\r
49     return SZ_ERROR_THREAD;\r
50   return Thread_Wait(&p->thread);\r
51 }\r
52 \r
53 WRes LoopThread_StartSubThread(CLoopThread *p) { return Event_Set(&p->startEvent); }\r
54 WRes LoopThread_WaitSubThread(CLoopThread *p) { return Event_Wait(&p->finishedEvent); }\r
55 \r
56 static SRes Progress(ICompressProgress *p, UInt64 inSize, UInt64 outSize)\r
57 {\r
58   return (p && p->Progress(p, inSize, outSize) != SZ_OK) ? SZ_ERROR_PROGRESS : SZ_OK;\r
59 }\r
60 \r
61 static void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)\r
62 {\r
63   unsigned i;\r
64   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r
65     p->inSizes[i] = p->outSizes[i] = 0;\r
66   p->totalInSize = p->totalOutSize = 0;\r
67   p->progress = progress;\r
68   p->res = SZ_OK;\r
69 }\r
70 \r
71 static void MtProgress_Reinit(CMtProgress *p, unsigned index)\r
72 {\r
73   p->inSizes[index] = 0;\r
74   p->outSizes[index] = 0;\r
75 }\r
76 \r
77 #define UPDATE_PROGRESS(size, prev, total) \\r
78   if (size != (UInt64)(Int64)-1) { total += size - prev; prev = size; }\r
79 \r
80 SRes MtProgress_Set(CMtProgress *p, unsigned index, UInt64 inSize, UInt64 outSize)\r
81 {\r
82   SRes res;\r
83   CriticalSection_Enter(&p->cs);\r
84   UPDATE_PROGRESS(inSize, p->inSizes[index], p->totalInSize)\r
85   UPDATE_PROGRESS(outSize, p->outSizes[index], p->totalOutSize)\r
86   if (p->res == SZ_OK)\r
87     p->res = Progress(p->progress, p->totalInSize, p->totalOutSize);\r
88   res = p->res;\r
89   CriticalSection_Leave(&p->cs);\r
90   return res;\r
91 }\r
92 \r
93 static void MtProgress_SetError(CMtProgress *p, SRes res)\r
94 {\r
95   CriticalSection_Enter(&p->cs);\r
96   if (p->res == SZ_OK)\r
97     p->res = res;\r
98   CriticalSection_Leave(&p->cs);\r
99 }\r
100 \r
101 static void MtCoder_SetError(CMtCoder* p, SRes res)\r
102 {\r
103   CriticalSection_Enter(&p->cs);\r
104   if (p->res == SZ_OK)\r
105     p->res = res;\r
106   CriticalSection_Leave(&p->cs);\r
107 }\r
108 \r
109 /* ---------- MtThread ---------- */\r
110 \r
111 void CMtThread_Construct(CMtThread *p, CMtCoder *mtCoder)\r
112 {\r
113   p->mtCoder = mtCoder;\r
114   p->outBuf = 0;\r
115   p->inBuf = 0;\r
116   Event_Construct(&p->canRead);\r
117   Event_Construct(&p->canWrite);\r
118   LoopThread_Construct(&p->thread);\r
119 }\r
120 \r
121 #define RINOK_THREAD(x) { if((x) != 0) return SZ_ERROR_THREAD; }\r
122 \r
123 static void CMtThread_CloseEvents(CMtThread *p)\r
124 {\r
125   Event_Close(&p->canRead);\r
126   Event_Close(&p->canWrite);\r
127 }\r
128 \r
129 static void CMtThread_Destruct(CMtThread *p)\r
130 {\r
131   CMtThread_CloseEvents(p);\r
132 \r
133   if (Thread_WasCreated(&p->thread.thread))\r
134   {\r
135     LoopThread_StopAndWait(&p->thread);\r
136     LoopThread_Close(&p->thread);\r
137   }\r
138 \r
139   if (p->mtCoder->alloc)\r
140     IAlloc_Free(p->mtCoder->alloc, p->outBuf);\r
141   p->outBuf = 0;\r
142 \r
143   if (p->mtCoder->alloc)\r
144     IAlloc_Free(p->mtCoder->alloc, p->inBuf);\r
145   p->inBuf = 0;\r
146 }\r
147 \r
148 #define MY_BUF_ALLOC(buf, size, newSize) \\r
149   if (buf == 0 || size != newSize) \\r
150   { IAlloc_Free(p->mtCoder->alloc, buf); \\r
151     size = newSize; buf = (Byte *)IAlloc_Alloc(p->mtCoder->alloc, size); \\r
152     if (buf == 0) return SZ_ERROR_MEM; }\r
153 \r
154 static SRes CMtThread_Prepare(CMtThread *p)\r
155 {\r
156   MY_BUF_ALLOC(p->inBuf, p->inBufSize, p->mtCoder->blockSize)\r
157   MY_BUF_ALLOC(p->outBuf, p->outBufSize, p->mtCoder->destBlockSize)\r
158 \r
159   p->stopReading = False;\r
160   p->stopWriting = False;\r
161   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canRead));\r
162   RINOK_THREAD(AutoResetEvent_CreateNotSignaled(&p->canWrite));\r
163 \r
164   return SZ_OK;\r
165 }\r
166 \r
167 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)\r
168 {\r
169   size_t size = *processedSize;\r
170   *processedSize = 0;\r
171   while (size != 0)\r
172   {\r
173     size_t curSize = size;\r
174     SRes res = stream->Read(stream, data, &curSize);\r
175     *processedSize += curSize;\r
176     data += curSize;\r
177     size -= curSize;\r
178     RINOK(res);\r
179     if (curSize == 0)\r
180       return SZ_OK;\r
181   }\r
182   return SZ_OK;\r
183 }\r
184 \r
185 #define GET_NEXT_THREAD(p) &p->mtCoder->threads[p->index == p->mtCoder->numThreads  - 1 ? 0 : p->index + 1]\r
186 \r
187 static SRes MtThread_Process(CMtThread *p, Bool *stop)\r
188 {\r
189   CMtThread *next;\r
190   *stop = True;\r
191   if (Event_Wait(&p->canRead) != 0)\r
192     return SZ_ERROR_THREAD;\r
193   \r
194   next = GET_NEXT_THREAD(p);\r
195   \r
196   if (p->stopReading)\r
197   {\r
198     next->stopReading = True;\r
199     return Event_Set(&next->canRead) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r
200   }\r
201 \r
202   {\r
203     size_t size = p->mtCoder->blockSize;\r
204     size_t destSize = p->outBufSize;\r
205 \r
206     RINOK(FullRead(p->mtCoder->inStream, p->inBuf, &size));\r
207     next->stopReading = *stop = (size != p->mtCoder->blockSize);\r
208     if (Event_Set(&next->canRead) != 0)\r
209       return SZ_ERROR_THREAD;\r
210 \r
211     RINOK(p->mtCoder->mtCallback->Code(p->mtCoder->mtCallback, p->index,\r
212         p->outBuf, &destSize, p->inBuf, size, *stop));\r
213 \r
214     MtProgress_Reinit(&p->mtCoder->mtProgress, p->index);\r
215 \r
216     if (Event_Wait(&p->canWrite) != 0)\r
217       return SZ_ERROR_THREAD;\r
218     if (p->stopWriting)\r
219       return SZ_ERROR_FAIL;\r
220     if (p->mtCoder->outStream->Write(p->mtCoder->outStream, p->outBuf, destSize) != destSize)\r
221       return SZ_ERROR_WRITE;\r
222     return Event_Set(&next->canWrite) == 0 ? SZ_OK : SZ_ERROR_THREAD;\r
223   }\r
224 }\r
225 \r
226 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)\r
227 {\r
228   CMtThread *p = (CMtThread *)pp;\r
229   for (;;)\r
230   {\r
231     Bool stop;\r
232     CMtThread *next = GET_NEXT_THREAD(p);\r
233     SRes res = MtThread_Process(p, &stop);\r
234     if (res != SZ_OK)\r
235     {\r
236       MtCoder_SetError(p->mtCoder, res);\r
237       MtProgress_SetError(&p->mtCoder->mtProgress, res);\r
238       next->stopReading = True;\r
239       next->stopWriting = True;\r
240       Event_Set(&next->canRead);\r
241       Event_Set(&next->canWrite);\r
242       return res;\r
243     }\r
244     if (stop)\r
245       return 0;\r
246   }\r
247 }\r
248 \r
249 void MtCoder_Construct(CMtCoder* p)\r
250 {\r
251   unsigned i;\r
252   p->alloc = 0;\r
253   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r
254   {\r
255     CMtThread *t = &p->threads[i];\r
256     t->index = i;\r
257     CMtThread_Construct(t, p);\r
258   }\r
259   CriticalSection_Init(&p->cs);\r
260   CriticalSection_Init(&p->mtProgress.cs);\r
261 }\r
262 \r
263 void MtCoder_Destruct(CMtCoder* p)\r
264 {\r
265   unsigned i;\r
266   for (i = 0; i < NUM_MT_CODER_THREADS_MAX; i++)\r
267     CMtThread_Destruct(&p->threads[i]);\r
268   CriticalSection_Delete(&p->cs);\r
269   CriticalSection_Delete(&p->mtProgress.cs);\r
270 }\r
271 \r
272 SRes MtCoder_Code(CMtCoder *p)\r
273 {\r
274   unsigned i, numThreads = p->numThreads;\r
275   SRes res = SZ_OK;\r
276   p->res = SZ_OK;\r
277 \r
278   MtProgress_Init(&p->mtProgress, p->progress);\r
279 \r
280   for (i = 0; i < numThreads; i++)\r
281   {\r
282     RINOK(CMtThread_Prepare(&p->threads[i]));\r
283   }\r
284 \r
285   for (i = 0; i < numThreads; i++)\r
286   {\r
287     CMtThread *t = &p->threads[i];\r
288     CLoopThread *lt = &t->thread;\r
289 \r
290     if (!Thread_WasCreated(&lt->thread))\r
291     {\r
292       lt->func = ThreadFunc;\r
293       lt->param = t;\r
294 \r
295       if (LoopThread_Create(lt) != SZ_OK)\r
296       {\r
297         res = SZ_ERROR_THREAD;\r
298         break;\r
299       }\r
300     }\r
301   }\r
302 \r
303   if (res == SZ_OK)\r
304   {\r
305     unsigned j;\r
306     for (i = 0; i < numThreads; i++)\r
307     {\r
308       CMtThread *t = &p->threads[i];\r
309       if (LoopThread_StartSubThread(&t->thread) != SZ_OK)\r
310       {\r
311         res = SZ_ERROR_THREAD;\r
312         p->threads[0].stopReading = True;\r
313         break;\r
314       }\r
315     }\r
316 \r
317     Event_Set(&p->threads[0].canWrite);\r
318     Event_Set(&p->threads[0].canRead);\r
319 \r
320     for (j = 0; j < i; j++)\r
321       LoopThread_WaitSubThread(&p->threads[j].thread);\r
322   }\r
323 \r
324   for (i = 0; i < numThreads; i++)\r
325     CMtThread_CloseEvents(&p->threads[i]);\r
326   return (res == SZ_OK) ? p->res : res;\r
327 }\r