Initial version of libomxil-vc4 for RPI3
[platform/adaptation/broadcom/libomxil-vc4.git] / containers / core / containers_io.c
1 /*
2 Copyright (c) 2012, Broadcom Europe Ltd
3 All rights reserved.
4
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7     * Redistributions of source code must retain the above copyright
8       notice, this list of conditions and the following disclaimer.
9     * Redistributions in binary form must reproduce the above copyright
10       notice, this list of conditions and the following disclaimer in the
11       documentation and/or other materials provided with the distribution.
12     * Neither the name of the copyright holder nor the
13       names of its contributors may be used to endorse or promote products
14       derived from this software without specific prior written permission.
15
16 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
20 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26 */
27
28 #include <stdlib.h>
29 #include <string.h>
30 #include <stdio.h>
31
32 #include "containers/containers.h"
33 #include "containers/core/containers_io.h"
34 #include "containers/core/containers_common.h"
35 #include "containers/core/containers_utils.h"
36 #include "containers/core/containers_uri.h"
37
38 #define MAX_NUM_CACHED_AREAS 16
39 #define MAX_NUM_MEMORY_AREAS 4
40 #define NUM_TMP_MEMORY_AREAS 2
41 #define MEM_CACHE_READ_MAX_SIZE (32*1024) /* Needs to be a power of 2 */
42 #define MEM_CACHE_WRITE_MAX_SIZE (128*1024) /* Needs to be a power of 2 */
43 #define MEM_CACHE_TMP_MAX_SIZE (32*1024) /* Needs to be a power of 2 */
44 #define MEM_CACHE_ALIGNMENT (1*1024) /* Needs to be a power of 2 */
45 #define MEM_CACHE_AREA_READ_MAX_SIZE (4*1024*1024) /* Needs to be a power of 2 */
46
47 typedef struct VC_CONTAINER_IO_PRIVATE_CACHE_T
48 {
49    int64_t start; /**< Offset to the start of the cached area in the stream */
50    int64_t end;    /**< Offset to the end of the cached area in the stream */
51
52    int64_t offset; /**< Offset of the currently cached data in the stream */
53    size_t size;    /**< Size of the cached area */
54    bool dirty;     /**< Whether the cache is dirty and needs to be written back */
55
56    size_t position; /**< Current position in the cache */
57
58    uint8_t *buffer;          /**< Pointer to the start of the valid cache area */
59    uint8_t *buffer_end;      /**< Pointer to the end of the cache */
60
61    unsigned int mem_max_size; /**< Maximum size of the memory cache */
62    unsigned int mem_size; /**< Size of the memory cache */
63    uint8_t *mem;          /**< Pointer to the memory cache */
64
65    VC_CONTAINER_IO_T *io;
66
67 } VC_CONTAINER_IO_PRIVATE_CACHE_T;
68
69 typedef struct VC_CONTAINER_IO_PRIVATE_T
70 {
71    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache; /**< Current cache */
72
73    unsigned int caches_num;
74    VC_CONTAINER_IO_PRIVATE_CACHE_T caches;
75
76    unsigned int cached_areas_num;
77    VC_CONTAINER_IO_PRIVATE_CACHE_T cached_areas[MAX_NUM_CACHED_AREAS];
78
79    int64_t actual_offset;
80
81    struct VC_CONTAINER_IO_ASYNC_T *async_io;
82
83 } VC_CONTAINER_IO_PRIVATE_T;
84
85 /*****************************************************************************/
86 VC_CONTAINER_STATUS_T vc_container_io_file_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
87                                                  VC_CONTAINER_IO_MODE_T mode );
88 VC_CONTAINER_STATUS_T vc_container_io_null_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
89                                                  VC_CONTAINER_IO_MODE_T mode );
90 VC_CONTAINER_STATUS_T vc_container_io_net_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
91                                                  VC_CONTAINER_IO_MODE_T mode );
92 VC_CONTAINER_STATUS_T vc_container_io_pktfile_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
93                                                  VC_CONTAINER_IO_MODE_T mode );
94 VC_CONTAINER_STATUS_T vc_container_io_http_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
95                                                  VC_CONTAINER_IO_MODE_T mode );
96 static VC_CONTAINER_STATUS_T io_seek_not_seekable(VC_CONTAINER_IO_T *p_ctx, int64_t offset);
97
98 static size_t vc_container_io_cache_read( VC_CONTAINER_IO_T *p_ctx,
99    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *data, size_t size );
100 static int32_t vc_container_io_cache_write( VC_CONTAINER_IO_T *p_ctx,
101    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, const uint8_t *data, size_t size );
102 static VC_CONTAINER_STATUS_T vc_container_io_cache_seek( VC_CONTAINER_IO_T *p_ctx,
103    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int64_t offset );
104 static size_t vc_container_io_cache_refill( VC_CONTAINER_IO_T *p_ctx,
105    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache );
106 static size_t vc_container_io_cache_flush( VC_CONTAINER_IO_T *p_ctx,
107    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete );
108
109 static struct VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T * );
110 static VC_CONTAINER_STATUS_T async_io_stop( struct VC_CONTAINER_IO_ASYNC_T *ctx );
111 static int async_io_write( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache );
112 static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
113                                                      VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete  );
114 static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable );
115 static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats );
116
117 /*****************************************************************************/
118 static VC_CONTAINER_IO_T *vc_container_io_open_core( const char *uri, VC_CONTAINER_IO_MODE_T mode,
119                                                      VC_CONTAINER_IO_CAPABILITIES_T capabilities,
120                                                      bool b_open, VC_CONTAINER_STATUS_T *p_status )
121 {
122    VC_CONTAINER_STATUS_T status = VC_CONTAINER_SUCCESS;
123    VC_CONTAINER_IO_T *p_ctx = 0;
124    VC_CONTAINER_IO_PRIVATE_T *private = 0;
125    unsigned int uri_length, caches = 0, cache_max_size, num_areas = MAX_NUM_MEMORY_AREAS;
126
127    /* XXX */
128    uri_length = strlen(uri) + 1;
129
130    /* Allocate our context before trying out the different io modules */
131    p_ctx = malloc( sizeof(*p_ctx) + sizeof(*private) + uri_length);
132    if(!p_ctx) { status = VC_CONTAINER_ERROR_OUT_OF_MEMORY; goto error; }
133    memset(p_ctx, 0, sizeof(*p_ctx) + sizeof(*private) + uri_length );
134    p_ctx->priv = private = (VC_CONTAINER_IO_PRIVATE_T *)&p_ctx[1];
135    p_ctx->uri = (char *)&private[1];
136    memcpy((char *)p_ctx->uri, uri, uri_length);
137    p_ctx->uri_parts = vc_uri_create();
138    if(!p_ctx->uri_parts) { status = VC_CONTAINER_ERROR_OUT_OF_MEMORY; goto error; }
139    vc_uri_parse(p_ctx->uri_parts, uri);
140
141    if (b_open)
142    {
143       /* Open the actual i/o module */
144       status = vc_container_io_null_open(p_ctx, uri, mode);
145       if(status) status = vc_container_io_net_open(p_ctx, uri, mode);
146       if(status) status = vc_container_io_pktfile_open(p_ctx, uri, mode);
147 #ifdef ENABLE_CONTAINER_IO_HTTP
148       if(status) status = vc_container_io_http_open(p_ctx, uri, mode);
149 #endif
150       if(status) status = vc_container_io_file_open(p_ctx, uri, mode);
151       if(status != VC_CONTAINER_SUCCESS) goto error;
152
153       if(!p_ctx->pf_seek || (p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK))
154       {
155          p_ctx->capabilities |= VC_CONTAINER_IO_CAPS_CANT_SEEK;
156          p_ctx->pf_seek = io_seek_not_seekable;
157       }
158    }
159    else
160    {
161       /* We're only creating an empty container i/o */
162       p_ctx->capabilities = capabilities;
163    }
164
165    if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_NO_CACHING)
166       caches = 1;
167
168    if(mode == VC_CONTAINER_IO_MODE_WRITE) cache_max_size = MEM_CACHE_WRITE_MAX_SIZE;
169    else cache_max_size = MEM_CACHE_READ_MAX_SIZE;
170
171    if(mode == VC_CONTAINER_IO_MODE_WRITE &&
172       vc_uri_path_extension(p_ctx->uri_parts) &&
173       !strcasecmp(vc_uri_path_extension(p_ctx->uri_parts), "tmp"))
174    {
175       caches = 1;
176       cache_max_size = MEM_CACHE_TMP_MAX_SIZE;
177       num_areas = NUM_TMP_MEMORY_AREAS;
178    }
179
180    /* Check if the I/O needs caching */
181    if(caches)
182    {
183       VC_CONTAINER_IO_PRIVATE_CACHE_T *cache = &p_ctx->priv->caches;
184       cache->mem_max_size = cache_max_size;
185       cache->mem_size = cache->mem_max_size;
186       cache->io = p_ctx;
187       cache->mem = malloc(p_ctx->priv->caches.mem_size);
188       if(cache->mem)
189       {      
190          cache->buffer = cache->mem;
191          cache->buffer_end = cache->mem + cache->mem_size;
192          p_ctx->priv->caches_num = 1;
193       }
194    }
195
196    if(p_ctx->priv->caches_num)
197       p_ctx->priv->cache = &p_ctx->priv->caches;
198
199
200    /* Try to start an asynchronous io if we're in write mode and we've got at least 2 cache memory areas */
201    if(mode == VC_CONTAINER_IO_MODE_WRITE && p_ctx->priv->cache && num_areas >= 2)
202       p_ctx->priv->async_io = async_io_start( p_ctx, num_areas, 0 );
203
204  end:
205    if(p_status) *p_status = status;
206    return p_ctx;
207
208  error:
209    if(p_ctx) vc_uri_release(p_ctx->uri_parts);
210    if(p_ctx) free(p_ctx);
211    p_ctx = 0;
212    goto end;
213 }
214
215 /*****************************************************************************/
216 VC_CONTAINER_IO_T *vc_container_io_open( const char *uri, VC_CONTAINER_IO_MODE_T mode,
217                                          VC_CONTAINER_STATUS_T *p_status )
218 {
219    return vc_container_io_open_core( uri, mode, 0, true, p_status );
220 }
221
222 /*****************************************************************************/
223 VC_CONTAINER_IO_T *vc_container_io_create( const char *uri, VC_CONTAINER_IO_MODE_T mode,
224                                            VC_CONTAINER_IO_CAPABILITIES_T capabilities,
225                                            VC_CONTAINER_STATUS_T *p_status )
226 {
227    return vc_container_io_open_core( uri, mode, capabilities, false, p_status );
228 }
229
230 /*****************************************************************************/
231 VC_CONTAINER_STATUS_T vc_container_io_close( VC_CONTAINER_IO_T *p_ctx )
232 {
233    unsigned int i;
234
235    if(p_ctx)
236    {
237       if(p_ctx->priv)
238       {
239          if(p_ctx->priv->caches_num)
240          {
241             if(p_ctx->priv->caches.dirty)
242                vc_container_io_cache_flush( p_ctx, &p_ctx->priv->caches, 1 );
243          }
244          
245          if(p_ctx->priv->async_io)
246             async_io_stop( p_ctx->priv->async_io );
247          else if(p_ctx->priv->caches_num)
248             free(p_ctx->priv->caches.mem);
249
250          for(i = 0; i < p_ctx->priv->cached_areas_num; i++)
251             free(p_ctx->priv->cached_areas[i].mem);
252          
253          if(p_ctx->pf_close)
254             p_ctx->pf_close(p_ctx);
255       }
256       vc_uri_release(p_ctx->uri_parts);
257       free(p_ctx);
258    }
259    return VC_CONTAINER_SUCCESS;
260 }
261
262 /*****************************************************************************/
263 size_t vc_container_io_peek(VC_CONTAINER_IO_T *p_ctx, void *buffer, size_t size)
264 {
265    size_t ret;
266
267    if(p_ctx->priv->cache)
268    {
269       /* FIXME: do something a bit more clever than this */
270       int64_t offset = p_ctx->offset;
271       ret = vc_container_io_read(p_ctx, buffer, size);
272       vc_container_io_seek(p_ctx, offset);
273       return ret;
274    }
275
276    if (p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
277       return 0;
278
279    ret = p_ctx->pf_read(p_ctx, buffer, size);
280    p_ctx->pf_seek(p_ctx, p_ctx->offset);
281    return ret;
282 }
283
284 /*****************************************************************************/
285 size_t vc_container_io_read(VC_CONTAINER_IO_T *p_ctx, void *buffer, size_t size)
286 {
287    size_t ret;
288
289    if(p_ctx->priv->cache)
290       ret = vc_container_io_cache_read( p_ctx, p_ctx->priv->cache, (uint8_t*)buffer, size );
291    else
292    {
293       ret = p_ctx->pf_read(p_ctx, buffer, size);
294       p_ctx->priv->actual_offset += ret;
295    }
296
297    p_ctx->offset += ret;
298    return ret;
299 }
300
301 /*****************************************************************************/
302 size_t vc_container_io_write(VC_CONTAINER_IO_T *p_ctx, const void *buffer, size_t size)
303 {
304    int32_t ret;
305
306    if(p_ctx->priv->cache)
307       ret = vc_container_io_cache_write( p_ctx, p_ctx->priv->cache, (const uint8_t*)buffer, size );
308    else
309    {
310       ret = p_ctx->pf_write(p_ctx, buffer, size);
311       p_ctx->priv->actual_offset += ret;
312    }
313
314    p_ctx->offset += ret;
315    return ret < 0 ? 0 : ret;
316 }
317
318 /*****************************************************************************/
319 size_t vc_container_io_skip(VC_CONTAINER_IO_T *p_ctx, size_t size)
320 {
321    if(!size) return 0;
322
323    if(size < 8)
324    {
325       uint8_t value[8];
326       return vc_container_io_read(p_ctx, value, size);
327    }
328
329    if(p_ctx->priv->cache)
330    {
331       if(vc_container_io_cache_seek(p_ctx, p_ctx->priv->cache, p_ctx->offset + size)) return 0;
332       p_ctx->offset += size;
333       return size;
334    }
335
336    if(vc_container_io_seek(p_ctx, p_ctx->offset + size)) return 0;
337    return size;
338 }
339
340 /*****************************************************************************/
341 VC_CONTAINER_STATUS_T vc_container_io_seek(VC_CONTAINER_IO_T *p_ctx, int64_t offset)
342 {
343    VC_CONTAINER_STATUS_T status;
344    unsigned int i;
345
346    /* Check if the requested position is in one of the cached areas */
347    for(i = 0; i < p_ctx->priv->cached_areas_num; i++)
348    {
349       VC_CONTAINER_IO_PRIVATE_CACHE_T *cache = &p_ctx->priv->cached_areas[i];
350       if(offset >= cache->start && offset < cache->end)
351       {
352          p_ctx->priv->cache = cache;
353          break;
354       }
355    }
356    if(i == p_ctx->priv->cached_areas_num)
357       p_ctx->priv->cache = p_ctx->priv->caches_num ? &p_ctx->priv->caches : 0;
358
359    if(p_ctx->priv->cache)
360    {
361       status = vc_container_io_cache_seek( p_ctx, p_ctx->priv->cache, offset );
362       if(status == VC_CONTAINER_SUCCESS) p_ctx->offset = offset;
363       return status;
364    }
365
366    if(p_ctx->status == VC_CONTAINER_SUCCESS &&
367       offset == p_ctx->offset) return VC_CONTAINER_SUCCESS;
368
369    status = p_ctx->pf_seek(p_ctx, offset);
370    if(status == VC_CONTAINER_SUCCESS) p_ctx->offset = offset;
371    p_ctx->priv->actual_offset = p_ctx->offset;
372    return status;
373 }
374
375 /*****************************************************************************/
376 static VC_CONTAINER_STATUS_T io_seek_not_seekable(VC_CONTAINER_IO_T *p_ctx, int64_t offset)
377 {
378    VC_CONTAINER_IO_PRIVATE_T *private = p_ctx->priv;
379
380    vc_container_assert(offset >= private->actual_offset);
381    if(offset == private->actual_offset)  return VC_CONTAINER_SUCCESS;
382
383    if(offset < private->actual_offset)
384    {
385       p_ctx->status = VC_CONTAINER_ERROR_EOS;
386       return p_ctx->status;
387    }
388
389    offset -= private->actual_offset;
390    while(offset && !p_ctx->status)
391    {
392       uint8_t value[64];
393       unsigned int ret, size = MIN(offset, 64);
394       ret = p_ctx->pf_read(p_ctx, value, size);
395       if(ret != size) p_ctx->status = VC_CONTAINER_ERROR_EOS;
396       offset -= ret;
397    }
398    return p_ctx->status;
399 }
400
401 /*****************************************************************************/
402 VC_CONTAINER_STATUS_T vc_container_io_control_list(VC_CONTAINER_IO_T *context, VC_CONTAINER_CONTROL_T operation, va_list args)
403 {
404    VC_CONTAINER_STATUS_T status = VC_CONTAINER_ERROR_UNSUPPORTED_OPERATION;
405
406    if (context->pf_control)
407       status = context->pf_control(context, operation, args);
408
409    /* Option to add generic I/O control here */
410
411    if(operation == VC_CONTAINER_CONTROL_IO_FLUSH && context->priv->cache)
412    {
413       status = VC_CONTAINER_SUCCESS;
414       (void)vc_container_io_cache_flush( context, context->priv->cache, 1 );
415    }
416
417    if(operation == VC_CONTAINER_CONTROL_SET_IO_PERF_STATS && context->priv->async_io)
418    {
419       status = VC_CONTAINER_SUCCESS;
420       async_io_stats_initialise(context->priv->async_io, va_arg(args, int));
421    }
422
423    if(operation == VC_CONTAINER_CONTROL_GET_IO_PERF_STATS && context->priv->async_io)
424    {
425       status = VC_CONTAINER_SUCCESS;
426       async_io_stats_get(context->priv->async_io, va_arg(args, VC_CONTAINER_WRITE_STATS_T *));
427    }
428
429    return status;
430 }
431
432 /*****************************************************************************/
433 VC_CONTAINER_STATUS_T vc_container_io_control(VC_CONTAINER_IO_T *context, VC_CONTAINER_CONTROL_T operation, ...)
434 {
435    VC_CONTAINER_STATUS_T result;
436    va_list args;
437
438    va_start(args, operation);
439    result = vc_container_io_control_list(context, operation, args);
440    va_end(args);
441
442    return result;
443 }
444
445 /*****************************************************************************/
446 size_t vc_container_io_cache(VC_CONTAINER_IO_T *p_ctx, size_t size)
447 {
448    VC_CONTAINER_IO_PRIVATE_T *private = p_ctx->priv;
449    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, *main_cache;
450    VC_CONTAINER_STATUS_T status;
451
452    /* Sanity checking */
453    if(private->cached_areas_num >= MAX_NUM_CACHED_AREAS) return 0;
454
455    cache = &private->cached_areas[private->cached_areas_num];
456    cache->start = p_ctx->offset;
457    cache->end = cache->start + size;
458    cache->offset = p_ctx->offset;
459    cache->position = 0;
460    cache->size = 0;
461    cache->io = p_ctx;
462
463    /* Set the size of the cache area depending on the capabilities of the i/o */
464    if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
465       cache->mem_max_size = MEM_CACHE_AREA_READ_MAX_SIZE;
466    else if((p_ctx->capabilities & VC_CONTAINER_IO_CAPS_SEEK_SLOW) &&
467            size <= MEM_CACHE_AREA_READ_MAX_SIZE)
468       cache->mem_max_size = MEM_CACHE_AREA_READ_MAX_SIZE;
469    else
470       cache->mem_max_size = MEM_CACHE_READ_MAX_SIZE;
471
472    cache->mem_size = size;
473    if(cache->mem_size > cache->mem_max_size) cache->mem_size = cache->mem_max_size;
474    cache->mem = malloc(cache->mem_size);
475
476    cache->buffer = cache->mem;
477    cache->buffer_end = cache->mem + cache->mem_size;
478
479    if(!cache->mem) return 0;
480    private->cached_areas_num++;
481
482    /* Copy any data we've got in the current cache into the new cache */
483    main_cache = p_ctx->priv->cache;
484    if(main_cache && main_cache->position < main_cache->size)
485    {
486       cache->size = main_cache->size - main_cache->position;
487       if(cache->size > cache->mem_size) cache->size = cache->mem_size;
488       memcpy(cache->buffer, main_cache->buffer + main_cache->position, cache->size);
489       main_cache->position += cache->size;
490    }
491
492    /* Read the rest of the cache directly from the stream */
493    if(cache->mem_size > cache->size)
494    {
495       size_t ret = cache->io->pf_read(cache->io, cache->buffer + cache->size,
496                                       cache->mem_size - cache->size);
497       cache->size += ret;
498       cache->io->priv->actual_offset = cache->offset + cache->size;
499    }
500
501    status = vc_container_io_seek(p_ctx, cache->end);
502    if(status != VC_CONTAINER_SUCCESS)
503       return 0;
504
505    if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
506       return cache->size;
507    else
508       return size;
509 }
510
511 /*****************************************************************************/
512 static size_t vc_container_io_cache_refill( VC_CONTAINER_IO_T *p_ctx,
513    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
514 {
515    size_t ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
516
517    if(ret) return 0; /* TODO what should we do there ? */
518
519    if(p_ctx->priv->actual_offset != cache->offset)
520    {
521       if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
522          return 0;
523    }
524
525    ret = cache->io->pf_read(cache->io, cache->buffer, cache->buffer_end - cache->buffer);
526    cache->size = ret;
527    cache->position = 0;
528    cache->io->priv->actual_offset = cache->offset + ret;
529    return ret;
530 }
531
532 /*****************************************************************************/
533 static size_t vc_container_io_cache_refill_bypass( VC_CONTAINER_IO_T *p_ctx,
534    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *buffer, size_t size )
535 {
536    size_t ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
537
538    if(ret) return 0; /* TODO what should we do there ? */
539
540    if(p_ctx->priv->actual_offset != cache->offset)
541    {
542       if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
543          return 0;
544    }
545
546    ret = cache->io->pf_read(cache->io, buffer, size);
547    cache->size = cache->position = 0;
548    cache->offset += ret;
549    cache->io->priv->actual_offset = cache->offset;
550    return ret;
551 }
552
553 /*****************************************************************************/
554 static size_t vc_container_io_cache_read( VC_CONTAINER_IO_T *p_ctx,
555    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *data, size_t size )
556 {
557    size_t read = 0, bytes, ret;
558
559    while(size)
560    {
561       bytes = cache->size - cache->position; /* Bytes left in cache */
562
563 #if 1 // FIXME Only if stream is seekable
564       /* Try to read directly from the stream if the cache just gets in the way */
565       if(!bytes && size > cache->mem_size)
566       {
567          bytes = cache->mem_size;
568          ret = vc_container_io_cache_refill_bypass( p_ctx, cache, data + read, bytes);
569          read += ret;
570
571          if(ret != bytes) /* We didn't read as many bytes as we had hoped */
572             goto end;
573
574          size -= bytes;
575          continue;
576       }
577 #endif
578
579       /* Refill the cache if it is empty */
580       if(!bytes) bytes = vc_container_io_cache_refill( p_ctx, cache );
581       if(!bytes) goto end;
582
583       /* We do have some data in the cache so override the status */
584       p_ctx->status = VC_CONTAINER_SUCCESS;
585
586       /* Read data directly from the cache */
587       if(bytes > size) bytes = size;
588       memcpy(data + read, cache->buffer + cache->position, bytes);
589       cache->position += bytes;
590       read += bytes;
591       size -= bytes;
592    }
593
594  end:
595    vc_container_assert(cache->offset + cache->position == p_ctx->offset + read);
596    return read;
597 }
598
599 /*****************************************************************************/
600 static int32_t vc_container_io_cache_write( VC_CONTAINER_IO_T *p_ctx,
601    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, const uint8_t *data, size_t size )
602 {
603    int32_t written = 0;
604    size_t bytes, ret;
605
606    /* If we do not have a write cache then we need to flush it */
607    if(cache->size && !cache->dirty)
608    {
609       ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
610       if(ret) return -(int32_t)ret;
611    }
612
613    while(size)
614    {
615       bytes = (cache->buffer_end - cache->buffer) - cache->position; /* Space left in cache */
616
617       /* Flush the cache if it is full */
618       if(!bytes)
619       {
620          /* Cache full, flush it */
621          ret = vc_container_io_cache_flush( p_ctx, cache, 0 );
622          if(ret)
623          {
624             written -= ret;
625             return written;
626          }
627          continue;
628       }
629
630       if(bytes > size) bytes = size;
631
632       if(!p_ctx->priv->async_io && bytes == cache->mem_size)
633       {
634          /* Write directly from the buffer */
635          ret = cache->io->pf_write(cache->io, data + written, bytes);
636          cache->offset += ret;
637          cache->io->priv->actual_offset += ret;
638       }
639       else
640       {
641          /* Write in the cache */
642          memcpy(cache->buffer + cache->position, data + written, bytes);
643          cache->position += bytes;
644          cache->dirty = 1;
645          ret = bytes;
646       }
647
648       written += ret;
649       if(ret != bytes) goto end;
650
651       size -= bytes;
652    }
653
654  end:
655    vc_container_assert(cache->offset + (int64_t)cache->position == p_ctx->offset + written);
656    if(cache->position > cache->size) cache->size = cache->position;
657    return written;
658 }
659
660 /*****************************************************************************/
661 static VC_CONTAINER_STATUS_T vc_container_io_cache_seek(VC_CONTAINER_IO_T *p_ctx,
662    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int64_t offset)
663 {
664    VC_CONTAINER_STATUS_T status;
665    size_t shift, ret;
666
667    /* Check if the seek position is within our cache */
668    if(offset >= cache->offset && offset < cache->offset + (int64_t)cache->size)
669    {
670       cache->position = offset - cache->offset;
671       return VC_CONTAINER_SUCCESS;
672    }
673
674    shift = cache->buffer - cache->mem;
675    if(!cache->dirty && shift && cache->size &&
676       offset >= cache->offset - (int64_t)shift && offset < cache->offset)
677    {
678       /* We need to refill the partial bit of the cache that we didn't take care of last time */
679       status = cache->io->pf_seek(cache->io, cache->offset - shift);
680       if(status != VC_CONTAINER_SUCCESS) return status;
681       cache->offset -= shift;
682       cache->buffer -= shift;
683
684       ret = cache->io->pf_read(cache->io, cache->buffer, shift);
685       vc_container_assert(ret == shift); /* FIXME: ret must = shift */
686       cache->size += shift;
687       cache->position = offset - cache->offset;
688       cache->io->priv->actual_offset = cache->offset + ret;
689       return VC_CONTAINER_SUCCESS;
690    }
691
692    if(cache->dirty) vc_container_io_cache_flush( p_ctx, cache, 1 );
693    // FIXME: what if all the data couldn't be flushed ?
694
695    if(p_ctx->priv->async_io) async_io_wait_complete( p_ctx->priv->async_io, cache, 1 );
696
697    status = cache->io->pf_seek(cache->io, offset);
698    if(status != VC_CONTAINER_SUCCESS) return status;
699
700    vc_container_io_cache_flush( p_ctx, cache, 1 );
701
702    cache->offset = offset;
703    cache->io->priv->actual_offset = offset;
704    return VC_CONTAINER_SUCCESS;
705 }
706
707 /*****************************************************************************/
708 static size_t vc_container_io_cache_flush( VC_CONTAINER_IO_T *p_ctx,
709    VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
710 {
711    size_t ret = 0, shift;
712
713    if(cache->position > cache->size) cache->size = cache->position;
714
715    if(cache->dirty && cache->size)
716    {
717       if(p_ctx->priv->actual_offset != cache->offset)
718       {
719          if(p_ctx->priv->async_io) async_io_wait_complete( p_ctx->priv->async_io, cache, complete );
720
721          if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
722             return 0;
723       }
724
725       if(p_ctx->priv->async_io)
726       {
727          ret = async_io_write( p_ctx->priv->async_io, cache );
728          if(async_io_wait_complete( p_ctx->priv->async_io, cache, complete ) != VC_CONTAINER_SUCCESS)
729             ret = 0;
730       }
731       else
732          ret = cache->io->pf_write(cache->io, cache->buffer, cache->size);
733
734       cache->io->priv->actual_offset = cache->offset + ret;
735       ret = cache->position - ret;
736    }
737    cache->dirty = 0;
738
739    cache->offset += cache->size;
740    if(cache->mem_size == cache->mem_max_size)
741    {
742       shift = cache->offset &(MEM_CACHE_ALIGNMENT-1);
743       cache->buffer = cache->mem + shift;
744    }
745
746    cache->position = cache->size = 0;
747    return ret;
748 }
749
750 /*****************************************************************************
751  * Asynchronous I/O.
752  * This is here to keep the I/O as busy as possible by allowing the writer
753  * to continue its work while the I/O is taking place in the background.
754  *****************************************************************************/
755
756 #ifdef ENABLE_CONTAINERS_ASYNC_IO
757 #include "vcos.h"
758
759 #define NUMPC(c,n,s) ((c) < (1<<(s)) ? (n) : ((n) / (c >> (s))))
760
761 static void stats_initialise(VC_CONTAINER_STATS_T *st, uint32_t shift)
762 {
763    memset(st, 0, sizeof(VC_CONTAINER_STATS_T));
764    st->shift = shift;
765 }
766
767 static void stats_add_value(VC_CONTAINER_STATS_T *st, uint32_t count, uint32_t num)
768 {
769    uint32_t numpc;
770    int i, j;
771
772    if(count == 0) 
773       return;
774
775    numpc = NUMPC(count, num, st->shift);
776    // insert in the right place
777    i=0;
778    while(i < VC_CONTAINER_STATS_BINS && st->record[i].count != 0 && st->record[i].numpc > numpc)
779       i++;
780
781    if(st->record[i].count != 0 && st->record[i].numpc == numpc)
782    {
783       // equal numpc, can merge now
784       st->record[i].count += count;
785       st->record[i].num += num;
786    }
787    else
788    {
789       // shift higher records up
790       for(j=VC_CONTAINER_STATS_BINS; j>i; j--)
791          st->record[j] = st->record[j-1];
792
793       // write record in
794       st->record[i].count = count;
795       st->record[i].num = num;
796       st->record[i].numpc = numpc;
797
798       // if full, join the two closest records
799       if(st->record[VC_CONTAINER_STATS_BINS].count)
800       {
801          uint32_t min_diff = 0;
802          j = -1;
803
804          // find closest, based on difference between numpc
805          for(i=0; i<VC_CONTAINER_STATS_BINS; i++)
806          {
807             uint32_t diff = st->record[i].numpc - st->record[i+1].numpc;
808             if(j == -1 || diff < min_diff)
809             {
810                j = i;
811                min_diff = diff;
812             }
813          }
814
815          // merge these records
816          st->record[j].count += st->record[j+1].count;
817          st->record[j].num += st->record[j+1].num;
818          st->record[j].numpc = NUMPC(st->record[j].count, st->record[j].num, st->shift);
819
820          // shift down higher records
821          while(++j < VC_CONTAINER_STATS_BINS)
822             st->record[j] = st->record[j+1];
823
824          // zero the free top record
825          st->record[VC_CONTAINER_STATS_BINS].count = 0;
826          st->record[VC_CONTAINER_STATS_BINS].num = 0;
827          st->record[VC_CONTAINER_STATS_BINS].numpc = 0;
828       }
829    }
830 }
831
832 typedef struct VC_CONTAINER_IO_ASYNC_T
833 {
834    VC_CONTAINER_IO_T *io;
835    VCOS_THREAD_T thread;
836    VCOS_SEMAPHORE_T spare_sema;
837    VCOS_SEMAPHORE_T queue_sema;
838    VCOS_EVENT_T wake_event;
839    int quit;
840
841    unsigned int num_area;
842    uint8_t *mem[MAX_NUM_MEMORY_AREAS];    /**< Base address of memory areas */
843    uint8_t *buffer[MAX_NUM_MEMORY_AREAS]; /**< When queued for writing, pointer to start of valid cache area */
844    size_t size[MAX_NUM_MEMORY_AREAS];     /**< When queued for writing, size of valid area to write */
845    unsigned int cur_area;
846
847    unsigned char stack[3000];
848    int error;
849
850    int stats_enable;
851    VC_CONTAINER_WRITE_STATS_T stats;
852
853 } VC_CONTAINER_IO_ASYNC_T;
854
855 /*****************************************************************************/
856 static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable )
857 {
858    ctx->stats_enable = enable;
859    stats_initialise(&ctx->stats.write, 8);
860    stats_initialise(&ctx->stats.wait, 0);
861    stats_initialise(&ctx->stats.flush, 0);
862 }
863
864 static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats )
865 {
866    *stats = ctx->stats;
867 }
868
869 static void *async_io_thread(VOID *argv)
870 {
871    VC_CONTAINER_IO_ASYNC_T *ctx = argv;
872    unsigned int write_area = 0;
873
874    while (1)
875    {
876       unsigned long time = 0;
877
878       vcos_event_wait(&ctx->wake_event);
879       if(ctx->quit) break;
880
881       while(vcos_semaphore_trywait(&ctx->queue_sema) == VCOS_SUCCESS)
882       {
883          uint8_t *buffer = ctx->buffer[write_area];
884          size_t size = ctx->size[write_area];
885
886          if(ctx->stats_enable)
887             time = vcos_getmicrosecs();
888
889          if(ctx->io->pf_write(ctx->io, buffer, size) != size)
890             ctx->error = 1;
891
892          if(ctx->stats_enable)
893             stats_add_value(&ctx->stats.write, size, vcos_getmicrosecs() - time);
894
895          /* Signal that the write is done */
896          vcos_semaphore_post(&ctx->spare_sema);
897
898          if(++write_area == ctx->num_area)
899             write_area = 0;
900       }
901    }
902
903    return NULL;
904 }
905
906 static int async_io_write( VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
907 {
908    unsigned long time = 0;
909    unsigned int offset;
910
911    if(ctx->stats_enable)
912       time = vcos_getmicrosecs();
913
914    /* post the current area */
915    ctx->buffer[ctx->cur_area] = cache->buffer;
916    ctx->size[ctx->cur_area] = cache->size;
917    vcos_semaphore_post(&ctx->queue_sema);
918    vcos_event_signal(&ctx->wake_event);
919
920    /* now we need to grab another area */
921    vcos_semaphore_wait(&ctx->spare_sema);
922    if(++ctx->cur_area == ctx->num_area)
923       ctx->cur_area = 0;
924
925    if(ctx->stats_enable)
926       stats_add_value(&ctx->stats.wait, 1, vcos_getmicrosecs() - time);
927
928    /* alter cache mem to point to the new cur_area */
929    offset = cache->buffer - cache->mem;
930    cache->mem = ctx->mem[ctx->cur_area];
931    cache->buffer = cache->mem + offset;
932    cache->buffer_end = cache->mem + cache->mem_size;
933
934    return ctx->error ? 0 : cache->size;
935 }
936
937 static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
938                                                      VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
939 {
940    unsigned int time = 0;
941
942    if(ctx->stats_enable)
943       time = vcos_getmicrosecs();
944
945    if(complete)
946    {
947       int num;
948       /* Need to make sure that all memory areas have been written out, so should have num-1 spare */
949       for(num=0; num<ctx->num_area-1; num++)
950          vcos_semaphore_wait(&ctx->spare_sema);
951
952       for(num=0; num<ctx->num_area-1; num++)
953          vcos_semaphore_post(&ctx->spare_sema);
954    }
955    else
956    {
957       /* Need to make sure we can acquire one memory area */
958       vcos_semaphore_wait(&ctx->spare_sema);
959       vcos_semaphore_post(&ctx->spare_sema);
960    }
961    
962    if(ctx->stats_enable)
963       stats_add_value(&ctx->stats.flush, 1, vcos_getmicrosecs() - time);
964
965    return ctx->error ? VC_CONTAINER_ERROR_FAILED : VC_CONTAINER_SUCCESS;
966 }
967
968 static VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T *status )
969 {
970    VC_CONTAINER_IO_ASYNC_T *ctx = 0;
971    VCOS_UNSIGNED pri = 0;
972
973    /* Allocate our context  */
974    ctx = malloc(sizeof(*ctx));
975    if(!ctx) goto error_spare_sema;
976    memset(ctx, 0, sizeof(*ctx));
977    ctx->io = io;
978
979    ctx->mem[0] = io->priv->cache->mem;
980
981    for(ctx->num_area = 1; ctx->num_area < num_areas; ctx->num_area++)
982    {
983       ctx->mem[ctx->num_area] = malloc(io->priv->cache->mem_size);
984       if(!ctx->mem[ctx->num_area])
985          break;
986    }
987
988    if(ctx->num_area == 1) // no real benefit in asynchronous writes
989       goto error_spare_sema;
990
991    async_io_stats_initialise(ctx, 0);
992
993    if(vcos_semaphore_create(&ctx->spare_sema, "async_spare_sem", ctx->num_area-1) != VCOS_SUCCESS)
994       goto error_spare_sema;
995
996    if(vcos_semaphore_create(&ctx->queue_sema, "async_queue_sem", 0) != VCOS_SUCCESS)
997       goto error_queue_sema;
998
999    if (vcos_event_create(&ctx->wake_event, "async_wake_event") != VCOS_SUCCESS)
1000       goto error_event;
1001
1002    // run this thread at a slightly higher priority than the calling thread - that means that
1003    // we prefer to write to the SD card rather than filling the memory buffer.
1004    pri = vcos_thread_get_priority(vcos_thread_current());
1005    if(vcos_thread_create_classic(&ctx->thread, "async_io", async_io_thread, ctx,
1006          ctx->stack, sizeof(ctx->stack), pri-1, 10, VCOS_START) != VCOS_SUCCESS)
1007       goto error_thread;
1008
1009    if(status) *status = VC_CONTAINER_SUCCESS;
1010    return ctx;
1011
1012  error_thread:
1013    vcos_event_delete(&ctx->wake_event);
1014  error_event:
1015    vcos_semaphore_delete(&ctx->queue_sema);
1016  error_queue_sema:
1017    vcos_semaphore_delete(&ctx->spare_sema);
1018  error_spare_sema:
1019    if(ctx) free(ctx);
1020    if(status) *status = VC_CONTAINER_ERROR_FAILED;
1021    return 0;
1022 }
1023
1024 static VC_CONTAINER_STATUS_T async_io_stop( VC_CONTAINER_IO_ASYNC_T *ctx )
1025 {
1026    /* Block if a write operation is already in progress */
1027    //vcos_semaphore_wait(&ctx->sema);
1028    // XXX block until all done
1029
1030    ctx->quit = 1;
1031    vcos_event_signal(&ctx->wake_event);
1032    vcos_thread_join(&ctx->thread,NULL);
1033    vcos_event_delete(&ctx->wake_event);
1034    vcos_semaphore_delete(&ctx->queue_sema);
1035    vcos_semaphore_delete(&ctx->spare_sema);
1036
1037    while(ctx->num_area > 0)
1038       free(ctx->mem[--ctx->num_area]);
1039
1040    free(ctx);
1041    return VC_CONTAINER_SUCCESS;
1042 }
1043 #else
1044
1045 static struct VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T *status )
1046 {
1047    VC_CONTAINER_PARAM_UNUSED(io);
1048    VC_CONTAINER_PARAM_UNUSED(num_areas);
1049    if(status) *status = VC_CONTAINER_ERROR_FAILED;
1050    return 0;
1051 }
1052
1053 static int async_io_write( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
1054 {
1055    VC_CONTAINER_PARAM_UNUSED(ctx);
1056    VC_CONTAINER_PARAM_UNUSED(cache);
1057    return 0;
1058 }
1059
1060 static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
1061                                                      VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
1062 {
1063    VC_CONTAINER_PARAM_UNUSED(ctx);
1064    VC_CONTAINER_PARAM_UNUSED(cache);
1065    VC_CONTAINER_PARAM_UNUSED(complete);
1066    return 0;
1067 }
1068
1069 static VC_CONTAINER_STATUS_T async_io_stop( struct VC_CONTAINER_IO_ASYNC_T *ctx )
1070 {
1071    VC_CONTAINER_PARAM_UNUSED(ctx);
1072    return VC_CONTAINER_SUCCESS;
1073 }
1074
1075 static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable )
1076 {
1077    VC_CONTAINER_PARAM_UNUSED(ctx);
1078    VC_CONTAINER_PARAM_UNUSED(enable);
1079 }
1080
1081 static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats )
1082 {
1083    VC_CONTAINER_PARAM_UNUSED(ctx);
1084    VC_CONTAINER_PARAM_UNUSED(stats);
1085 }
1086
1087
1088 #endif