2 Copyright (c) 2012, Broadcom Europe Ltd
5 Redistribution and use in source and binary forms, with or without
6 modification, are permitted provided that the following conditions are met:
7 * Redistributions of source code must retain the above copyright
8 notice, this list of conditions and the following disclaimer.
9 * Redistributions in binary form must reproduce the above copyright
10 notice, this list of conditions and the following disclaimer in the
11 documentation and/or other materials provided with the distribution.
12 * Neither the name of the copyright holder nor the
13 names of its contributors may be used to endorse or promote products
14 derived from this software without specific prior written permission.
16 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17 ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
20 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 #include "containers/containers.h"
33 #include "containers/core/containers_io.h"
34 #include "containers/core/containers_common.h"
35 #include "containers/core/containers_utils.h"
36 #include "containers/core/containers_uri.h"
38 #define MAX_NUM_CACHED_AREAS 16
39 #define MAX_NUM_MEMORY_AREAS 4
40 #define NUM_TMP_MEMORY_AREAS 2
41 #define MEM_CACHE_READ_MAX_SIZE (32*1024) /* Needs to be a power of 2 */
42 #define MEM_CACHE_WRITE_MAX_SIZE (128*1024) /* Needs to be a power of 2 */
43 #define MEM_CACHE_TMP_MAX_SIZE (32*1024) /* Needs to be a power of 2 */
44 #define MEM_CACHE_ALIGNMENT (1*1024) /* Needs to be a power of 2 */
45 #define MEM_CACHE_AREA_READ_MAX_SIZE (4*1024*1024) /* Needs to be a power of 2 */
47 typedef struct VC_CONTAINER_IO_PRIVATE_CACHE_T
49 int64_t start; /**< Offset to the start of the cached area in the stream */
50 int64_t end; /**< Offset to the end of the cached area in the stream */
52 int64_t offset; /**< Offset of the currently cached data in the stream */
53 size_t size; /**< Size of the cached area */
54 bool dirty; /**< Whether the cache is dirty and needs to be written back */
56 size_t position; /**< Current position in the cache */
58 uint8_t *buffer; /**< Pointer to the start of the valid cache area */
59 uint8_t *buffer_end; /**< Pointer to the end of the cache */
61 unsigned int mem_max_size; /**< Maximum size of the memory cache */
62 unsigned int mem_size; /**< Size of the memory cache */
63 uint8_t *mem; /**< Pointer to the memory cache */
65 VC_CONTAINER_IO_T *io;
67 } VC_CONTAINER_IO_PRIVATE_CACHE_T;
69 typedef struct VC_CONTAINER_IO_PRIVATE_T
71 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache; /**< Current cache */
73 unsigned int caches_num;
74 VC_CONTAINER_IO_PRIVATE_CACHE_T caches;
76 unsigned int cached_areas_num;
77 VC_CONTAINER_IO_PRIVATE_CACHE_T cached_areas[MAX_NUM_CACHED_AREAS];
79 int64_t actual_offset;
81 struct VC_CONTAINER_IO_ASYNC_T *async_io;
83 } VC_CONTAINER_IO_PRIVATE_T;
85 /*****************************************************************************/
86 VC_CONTAINER_STATUS_T vc_container_io_file_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
87 VC_CONTAINER_IO_MODE_T mode );
88 VC_CONTAINER_STATUS_T vc_container_io_null_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
89 VC_CONTAINER_IO_MODE_T mode );
90 VC_CONTAINER_STATUS_T vc_container_io_net_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
91 VC_CONTAINER_IO_MODE_T mode );
92 VC_CONTAINER_STATUS_T vc_container_io_pktfile_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
93 VC_CONTAINER_IO_MODE_T mode );
94 VC_CONTAINER_STATUS_T vc_container_io_http_open( VC_CONTAINER_IO_T *p_ctx, const char *uri,
95 VC_CONTAINER_IO_MODE_T mode );
96 static VC_CONTAINER_STATUS_T io_seek_not_seekable(VC_CONTAINER_IO_T *p_ctx, int64_t offset);
98 static size_t vc_container_io_cache_read( VC_CONTAINER_IO_T *p_ctx,
99 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *data, size_t size );
100 static int32_t vc_container_io_cache_write( VC_CONTAINER_IO_T *p_ctx,
101 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, const uint8_t *data, size_t size );
102 static VC_CONTAINER_STATUS_T vc_container_io_cache_seek( VC_CONTAINER_IO_T *p_ctx,
103 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int64_t offset );
104 static size_t vc_container_io_cache_refill( VC_CONTAINER_IO_T *p_ctx,
105 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache );
106 static size_t vc_container_io_cache_flush( VC_CONTAINER_IO_T *p_ctx,
107 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete );
109 static struct VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T * );
110 static VC_CONTAINER_STATUS_T async_io_stop( struct VC_CONTAINER_IO_ASYNC_T *ctx );
111 static int async_io_write( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache );
112 static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
113 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete );
114 static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable );
115 static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats );
117 /*****************************************************************************/
118 static VC_CONTAINER_IO_T *vc_container_io_open_core( const char *uri, VC_CONTAINER_IO_MODE_T mode,
119 VC_CONTAINER_IO_CAPABILITIES_T capabilities,
120 bool b_open, VC_CONTAINER_STATUS_T *p_status )
122 VC_CONTAINER_STATUS_T status = VC_CONTAINER_SUCCESS;
123 VC_CONTAINER_IO_T *p_ctx = 0;
124 VC_CONTAINER_IO_PRIVATE_T *private = 0;
125 unsigned int uri_length, caches = 0, cache_max_size, num_areas = MAX_NUM_MEMORY_AREAS;
128 uri_length = strlen(uri) + 1;
130 /* Allocate our context before trying out the different io modules */
131 p_ctx = malloc( sizeof(*p_ctx) + sizeof(*private) + uri_length);
132 if(!p_ctx) { status = VC_CONTAINER_ERROR_OUT_OF_MEMORY; goto error; }
133 memset(p_ctx, 0, sizeof(*p_ctx) + sizeof(*private) + uri_length );
134 p_ctx->priv = private = (VC_CONTAINER_IO_PRIVATE_T *)&p_ctx[1];
135 p_ctx->uri = (char *)&private[1];
136 memcpy((char *)p_ctx->uri, uri, uri_length);
137 p_ctx->uri_parts = vc_uri_create();
138 if(!p_ctx->uri_parts) { status = VC_CONTAINER_ERROR_OUT_OF_MEMORY; goto error; }
139 vc_uri_parse(p_ctx->uri_parts, uri);
143 /* Open the actual i/o module */
144 status = vc_container_io_null_open(p_ctx, uri, mode);
145 if(status) status = vc_container_io_net_open(p_ctx, uri, mode);
146 if(status) status = vc_container_io_pktfile_open(p_ctx, uri, mode);
147 #ifdef ENABLE_CONTAINER_IO_HTTP
148 if(status) status = vc_container_io_http_open(p_ctx, uri, mode);
150 if(status) status = vc_container_io_file_open(p_ctx, uri, mode);
151 if(status != VC_CONTAINER_SUCCESS) goto error;
153 if(!p_ctx->pf_seek || (p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK))
155 p_ctx->capabilities |= VC_CONTAINER_IO_CAPS_CANT_SEEK;
156 p_ctx->pf_seek = io_seek_not_seekable;
161 /* We're only creating an empty container i/o */
162 p_ctx->capabilities = capabilities;
165 if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_NO_CACHING)
168 if(mode == VC_CONTAINER_IO_MODE_WRITE) cache_max_size = MEM_CACHE_WRITE_MAX_SIZE;
169 else cache_max_size = MEM_CACHE_READ_MAX_SIZE;
171 if(mode == VC_CONTAINER_IO_MODE_WRITE &&
172 vc_uri_path_extension(p_ctx->uri_parts) &&
173 !strcasecmp(vc_uri_path_extension(p_ctx->uri_parts), "tmp"))
176 cache_max_size = MEM_CACHE_TMP_MAX_SIZE;
177 num_areas = NUM_TMP_MEMORY_AREAS;
180 /* Check if the I/O needs caching */
183 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache = &p_ctx->priv->caches;
184 cache->mem_max_size = cache_max_size;
185 cache->mem_size = cache->mem_max_size;
187 cache->mem = malloc(p_ctx->priv->caches.mem_size);
190 cache->buffer = cache->mem;
191 cache->buffer_end = cache->mem + cache->mem_size;
192 p_ctx->priv->caches_num = 1;
196 if(p_ctx->priv->caches_num)
197 p_ctx->priv->cache = &p_ctx->priv->caches;
200 /* Try to start an asynchronous io if we're in write mode and we've got at least 2 cache memory areas */
201 if(mode == VC_CONTAINER_IO_MODE_WRITE && p_ctx->priv->cache && num_areas >= 2)
202 p_ctx->priv->async_io = async_io_start( p_ctx, num_areas, 0 );
205 if(p_status) *p_status = status;
209 if(p_ctx) vc_uri_release(p_ctx->uri_parts);
210 if(p_ctx) free(p_ctx);
215 /*****************************************************************************/
216 VC_CONTAINER_IO_T *vc_container_io_open( const char *uri, VC_CONTAINER_IO_MODE_T mode,
217 VC_CONTAINER_STATUS_T *p_status )
219 return vc_container_io_open_core( uri, mode, 0, true, p_status );
222 /*****************************************************************************/
223 VC_CONTAINER_IO_T *vc_container_io_create( const char *uri, VC_CONTAINER_IO_MODE_T mode,
224 VC_CONTAINER_IO_CAPABILITIES_T capabilities,
225 VC_CONTAINER_STATUS_T *p_status )
227 return vc_container_io_open_core( uri, mode, capabilities, false, p_status );
230 /*****************************************************************************/
231 VC_CONTAINER_STATUS_T vc_container_io_close( VC_CONTAINER_IO_T *p_ctx )
239 if(p_ctx->priv->caches_num)
241 if(p_ctx->priv->caches.dirty)
242 vc_container_io_cache_flush( p_ctx, &p_ctx->priv->caches, 1 );
245 if(p_ctx->priv->async_io)
246 async_io_stop( p_ctx->priv->async_io );
247 else if(p_ctx->priv->caches_num)
248 free(p_ctx->priv->caches.mem);
250 for(i = 0; i < p_ctx->priv->cached_areas_num; i++)
251 free(p_ctx->priv->cached_areas[i].mem);
254 p_ctx->pf_close(p_ctx);
256 vc_uri_release(p_ctx->uri_parts);
259 return VC_CONTAINER_SUCCESS;
262 /*****************************************************************************/
263 size_t vc_container_io_peek(VC_CONTAINER_IO_T *p_ctx, void *buffer, size_t size)
267 if(p_ctx->priv->cache)
269 /* FIXME: do something a bit more clever than this */
270 int64_t offset = p_ctx->offset;
271 ret = vc_container_io_read(p_ctx, buffer, size);
272 vc_container_io_seek(p_ctx, offset);
276 if (p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
279 ret = p_ctx->pf_read(p_ctx, buffer, size);
280 p_ctx->pf_seek(p_ctx, p_ctx->offset);
284 /*****************************************************************************/
285 size_t vc_container_io_read(VC_CONTAINER_IO_T *p_ctx, void *buffer, size_t size)
289 if(p_ctx->priv->cache)
290 ret = vc_container_io_cache_read( p_ctx, p_ctx->priv->cache, (uint8_t*)buffer, size );
293 ret = p_ctx->pf_read(p_ctx, buffer, size);
294 p_ctx->priv->actual_offset += ret;
297 p_ctx->offset += ret;
301 /*****************************************************************************/
302 size_t vc_container_io_write(VC_CONTAINER_IO_T *p_ctx, const void *buffer, size_t size)
306 if(p_ctx->priv->cache)
307 ret = vc_container_io_cache_write( p_ctx, p_ctx->priv->cache, (const uint8_t*)buffer, size );
310 ret = p_ctx->pf_write(p_ctx, buffer, size);
311 p_ctx->priv->actual_offset += ret;
314 p_ctx->offset += ret;
315 return ret < 0 ? 0 : ret;
318 /*****************************************************************************/
319 size_t vc_container_io_skip(VC_CONTAINER_IO_T *p_ctx, size_t size)
326 return vc_container_io_read(p_ctx, value, size);
329 if(p_ctx->priv->cache)
331 if(vc_container_io_cache_seek(p_ctx, p_ctx->priv->cache, p_ctx->offset + size)) return 0;
332 p_ctx->offset += size;
336 if(vc_container_io_seek(p_ctx, p_ctx->offset + size)) return 0;
340 /*****************************************************************************/
341 VC_CONTAINER_STATUS_T vc_container_io_seek(VC_CONTAINER_IO_T *p_ctx, int64_t offset)
343 VC_CONTAINER_STATUS_T status;
346 /* Check if the requested position is in one of the cached areas */
347 for(i = 0; i < p_ctx->priv->cached_areas_num; i++)
349 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache = &p_ctx->priv->cached_areas[i];
350 if(offset >= cache->start && offset < cache->end)
352 p_ctx->priv->cache = cache;
356 if(i == p_ctx->priv->cached_areas_num)
357 p_ctx->priv->cache = p_ctx->priv->caches_num ? &p_ctx->priv->caches : 0;
359 if(p_ctx->priv->cache)
361 status = vc_container_io_cache_seek( p_ctx, p_ctx->priv->cache, offset );
362 if(status == VC_CONTAINER_SUCCESS) p_ctx->offset = offset;
366 if(p_ctx->status == VC_CONTAINER_SUCCESS &&
367 offset == p_ctx->offset) return VC_CONTAINER_SUCCESS;
369 status = p_ctx->pf_seek(p_ctx, offset);
370 if(status == VC_CONTAINER_SUCCESS) p_ctx->offset = offset;
371 p_ctx->priv->actual_offset = p_ctx->offset;
375 /*****************************************************************************/
376 static VC_CONTAINER_STATUS_T io_seek_not_seekable(VC_CONTAINER_IO_T *p_ctx, int64_t offset)
378 VC_CONTAINER_IO_PRIVATE_T *private = p_ctx->priv;
380 vc_container_assert(offset >= private->actual_offset);
381 if(offset == private->actual_offset) return VC_CONTAINER_SUCCESS;
383 if(offset < private->actual_offset)
385 p_ctx->status = VC_CONTAINER_ERROR_EOS;
386 return p_ctx->status;
389 offset -= private->actual_offset;
390 while(offset && !p_ctx->status)
393 unsigned int ret, size = MIN(offset, 64);
394 ret = p_ctx->pf_read(p_ctx, value, size);
395 if(ret != size) p_ctx->status = VC_CONTAINER_ERROR_EOS;
398 return p_ctx->status;
401 /*****************************************************************************/
402 VC_CONTAINER_STATUS_T vc_container_io_control_list(VC_CONTAINER_IO_T *context, VC_CONTAINER_CONTROL_T operation, va_list args)
404 VC_CONTAINER_STATUS_T status = VC_CONTAINER_ERROR_UNSUPPORTED_OPERATION;
406 if (context->pf_control)
407 status = context->pf_control(context, operation, args);
409 /* Option to add generic I/O control here */
411 if(operation == VC_CONTAINER_CONTROL_IO_FLUSH && context->priv->cache)
413 status = VC_CONTAINER_SUCCESS;
414 (void)vc_container_io_cache_flush( context, context->priv->cache, 1 );
417 if(operation == VC_CONTAINER_CONTROL_SET_IO_PERF_STATS && context->priv->async_io)
419 status = VC_CONTAINER_SUCCESS;
420 async_io_stats_initialise(context->priv->async_io, va_arg(args, int));
423 if(operation == VC_CONTAINER_CONTROL_GET_IO_PERF_STATS && context->priv->async_io)
425 status = VC_CONTAINER_SUCCESS;
426 async_io_stats_get(context->priv->async_io, va_arg(args, VC_CONTAINER_WRITE_STATS_T *));
432 /*****************************************************************************/
433 VC_CONTAINER_STATUS_T vc_container_io_control(VC_CONTAINER_IO_T *context, VC_CONTAINER_CONTROL_T operation, ...)
435 VC_CONTAINER_STATUS_T result;
438 va_start(args, operation);
439 result = vc_container_io_control_list(context, operation, args);
445 /*****************************************************************************/
446 size_t vc_container_io_cache(VC_CONTAINER_IO_T *p_ctx, size_t size)
448 VC_CONTAINER_IO_PRIVATE_T *private = p_ctx->priv;
449 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, *main_cache;
450 VC_CONTAINER_STATUS_T status;
452 /* Sanity checking */
453 if(private->cached_areas_num >= MAX_NUM_CACHED_AREAS) return 0;
455 cache = &private->cached_areas[private->cached_areas_num];
456 cache->start = p_ctx->offset;
457 cache->end = cache->start + size;
458 cache->offset = p_ctx->offset;
463 /* Set the size of the cache area depending on the capabilities of the i/o */
464 if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
465 cache->mem_max_size = MEM_CACHE_AREA_READ_MAX_SIZE;
466 else if((p_ctx->capabilities & VC_CONTAINER_IO_CAPS_SEEK_SLOW) &&
467 size <= MEM_CACHE_AREA_READ_MAX_SIZE)
468 cache->mem_max_size = MEM_CACHE_AREA_READ_MAX_SIZE;
470 cache->mem_max_size = MEM_CACHE_READ_MAX_SIZE;
472 cache->mem_size = size;
473 if(cache->mem_size > cache->mem_max_size) cache->mem_size = cache->mem_max_size;
474 cache->mem = malloc(cache->mem_size);
476 cache->buffer = cache->mem;
477 cache->buffer_end = cache->mem + cache->mem_size;
479 if(!cache->mem) return 0;
480 private->cached_areas_num++;
482 /* Copy any data we've got in the current cache into the new cache */
483 main_cache = p_ctx->priv->cache;
484 if(main_cache && main_cache->position < main_cache->size)
486 cache->size = main_cache->size - main_cache->position;
487 if(cache->size > cache->mem_size) cache->size = cache->mem_size;
488 memcpy(cache->buffer, main_cache->buffer + main_cache->position, cache->size);
489 main_cache->position += cache->size;
492 /* Read the rest of the cache directly from the stream */
493 if(cache->mem_size > cache->size)
495 size_t ret = cache->io->pf_read(cache->io, cache->buffer + cache->size,
496 cache->mem_size - cache->size);
498 cache->io->priv->actual_offset = cache->offset + cache->size;
501 status = vc_container_io_seek(p_ctx, cache->end);
502 if(status != VC_CONTAINER_SUCCESS)
505 if(p_ctx->capabilities & VC_CONTAINER_IO_CAPS_CANT_SEEK)
511 /*****************************************************************************/
512 static size_t vc_container_io_cache_refill( VC_CONTAINER_IO_T *p_ctx,
513 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
515 size_t ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
517 if(ret) return 0; /* TODO what should we do there ? */
519 if(p_ctx->priv->actual_offset != cache->offset)
521 if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
525 ret = cache->io->pf_read(cache->io, cache->buffer, cache->buffer_end - cache->buffer);
528 cache->io->priv->actual_offset = cache->offset + ret;
532 /*****************************************************************************/
533 static size_t vc_container_io_cache_refill_bypass( VC_CONTAINER_IO_T *p_ctx,
534 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *buffer, size_t size )
536 size_t ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
538 if(ret) return 0; /* TODO what should we do there ? */
540 if(p_ctx->priv->actual_offset != cache->offset)
542 if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
546 ret = cache->io->pf_read(cache->io, buffer, size);
547 cache->size = cache->position = 0;
548 cache->offset += ret;
549 cache->io->priv->actual_offset = cache->offset;
553 /*****************************************************************************/
554 static size_t vc_container_io_cache_read( VC_CONTAINER_IO_T *p_ctx,
555 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, uint8_t *data, size_t size )
557 size_t read = 0, bytes, ret;
561 bytes = cache->size - cache->position; /* Bytes left in cache */
563 #if 1 // FIXME Only if stream is seekable
564 /* Try to read directly from the stream if the cache just gets in the way */
565 if(!bytes && size > cache->mem_size)
567 bytes = cache->mem_size;
568 ret = vc_container_io_cache_refill_bypass( p_ctx, cache, data + read, bytes);
571 if(ret != bytes) /* We didn't read as many bytes as we had hoped */
579 /* Refill the cache if it is empty */
580 if(!bytes) bytes = vc_container_io_cache_refill( p_ctx, cache );
583 /* We do have some data in the cache so override the status */
584 p_ctx->status = VC_CONTAINER_SUCCESS;
586 /* Read data directly from the cache */
587 if(bytes > size) bytes = size;
588 memcpy(data + read, cache->buffer + cache->position, bytes);
589 cache->position += bytes;
595 vc_container_assert(cache->offset + cache->position == p_ctx->offset + read);
599 /*****************************************************************************/
600 static int32_t vc_container_io_cache_write( VC_CONTAINER_IO_T *p_ctx,
601 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, const uint8_t *data, size_t size )
606 /* If we do not have a write cache then we need to flush it */
607 if(cache->size && !cache->dirty)
609 ret = vc_container_io_cache_flush( p_ctx, cache, 1 );
610 if(ret) return -(int32_t)ret;
615 bytes = (cache->buffer_end - cache->buffer) - cache->position; /* Space left in cache */
617 /* Flush the cache if it is full */
620 /* Cache full, flush it */
621 ret = vc_container_io_cache_flush( p_ctx, cache, 0 );
630 if(bytes > size) bytes = size;
632 if(!p_ctx->priv->async_io && bytes == cache->mem_size)
634 /* Write directly from the buffer */
635 ret = cache->io->pf_write(cache->io, data + written, bytes);
636 cache->offset += ret;
637 cache->io->priv->actual_offset += ret;
641 /* Write in the cache */
642 memcpy(cache->buffer + cache->position, data + written, bytes);
643 cache->position += bytes;
649 if(ret != bytes) goto end;
655 vc_container_assert(cache->offset + (int64_t)cache->position == p_ctx->offset + written);
656 if(cache->position > cache->size) cache->size = cache->position;
660 /*****************************************************************************/
661 static VC_CONTAINER_STATUS_T vc_container_io_cache_seek(VC_CONTAINER_IO_T *p_ctx,
662 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int64_t offset)
664 VC_CONTAINER_STATUS_T status;
667 /* Check if the seek position is within our cache */
668 if(offset >= cache->offset && offset < cache->offset + (int64_t)cache->size)
670 cache->position = offset - cache->offset;
671 return VC_CONTAINER_SUCCESS;
674 shift = cache->buffer - cache->mem;
675 if(!cache->dirty && shift && cache->size &&
676 offset >= cache->offset - (int64_t)shift && offset < cache->offset)
678 /* We need to refill the partial bit of the cache that we didn't take care of last time */
679 status = cache->io->pf_seek(cache->io, cache->offset - shift);
680 if(status != VC_CONTAINER_SUCCESS) return status;
681 cache->offset -= shift;
682 cache->buffer -= shift;
684 ret = cache->io->pf_read(cache->io, cache->buffer, shift);
685 vc_container_assert(ret == shift); /* FIXME: ret must = shift */
686 cache->size += shift;
687 cache->position = offset - cache->offset;
688 cache->io->priv->actual_offset = cache->offset + ret;
689 return VC_CONTAINER_SUCCESS;
692 if(cache->dirty) vc_container_io_cache_flush( p_ctx, cache, 1 );
693 // FIXME: what if all the data couldn't be flushed ?
695 if(p_ctx->priv->async_io) async_io_wait_complete( p_ctx->priv->async_io, cache, 1 );
697 status = cache->io->pf_seek(cache->io, offset);
698 if(status != VC_CONTAINER_SUCCESS) return status;
700 vc_container_io_cache_flush( p_ctx, cache, 1 );
702 cache->offset = offset;
703 cache->io->priv->actual_offset = offset;
704 return VC_CONTAINER_SUCCESS;
707 /*****************************************************************************/
708 static size_t vc_container_io_cache_flush( VC_CONTAINER_IO_T *p_ctx,
709 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
711 size_t ret = 0, shift;
713 if(cache->position > cache->size) cache->size = cache->position;
715 if(cache->dirty && cache->size)
717 if(p_ctx->priv->actual_offset != cache->offset)
719 if(p_ctx->priv->async_io) async_io_wait_complete( p_ctx->priv->async_io, cache, complete );
721 if(cache->io->pf_seek(cache->io, cache->offset) != VC_CONTAINER_SUCCESS)
725 if(p_ctx->priv->async_io)
727 ret = async_io_write( p_ctx->priv->async_io, cache );
728 if(async_io_wait_complete( p_ctx->priv->async_io, cache, complete ) != VC_CONTAINER_SUCCESS)
732 ret = cache->io->pf_write(cache->io, cache->buffer, cache->size);
734 cache->io->priv->actual_offset = cache->offset + ret;
735 ret = cache->position - ret;
739 cache->offset += cache->size;
740 if(cache->mem_size == cache->mem_max_size)
742 shift = cache->offset &(MEM_CACHE_ALIGNMENT-1);
743 cache->buffer = cache->mem + shift;
746 cache->position = cache->size = 0;
750 /*****************************************************************************
752 * This is here to keep the I/O as busy as possible by allowing the writer
753 * to continue its work while the I/O is taking place in the background.
754 *****************************************************************************/
756 #ifdef ENABLE_CONTAINERS_ASYNC_IO
759 #define NUMPC(c,n,s) ((c) < (1<<(s)) ? (n) : ((n) / (c >> (s))))
761 static void stats_initialise(VC_CONTAINER_STATS_T *st, uint32_t shift)
763 memset(st, 0, sizeof(VC_CONTAINER_STATS_T));
767 static void stats_add_value(VC_CONTAINER_STATS_T *st, uint32_t count, uint32_t num)
775 numpc = NUMPC(count, num, st->shift);
776 // insert in the right place
778 while(i < VC_CONTAINER_STATS_BINS && st->record[i].count != 0 && st->record[i].numpc > numpc)
781 if(st->record[i].count != 0 && st->record[i].numpc == numpc)
783 // equal numpc, can merge now
784 st->record[i].count += count;
785 st->record[i].num += num;
789 // shift higher records up
790 for(j=VC_CONTAINER_STATS_BINS; j>i; j--)
791 st->record[j] = st->record[j-1];
794 st->record[i].count = count;
795 st->record[i].num = num;
796 st->record[i].numpc = numpc;
798 // if full, join the two closest records
799 if(st->record[VC_CONTAINER_STATS_BINS].count)
801 uint32_t min_diff = 0;
804 // find closest, based on difference between numpc
805 for(i=0; i<VC_CONTAINER_STATS_BINS; i++)
807 uint32_t diff = st->record[i].numpc - st->record[i+1].numpc;
808 if(j == -1 || diff < min_diff)
815 // merge these records
816 st->record[j].count += st->record[j+1].count;
817 st->record[j].num += st->record[j+1].num;
818 st->record[j].numpc = NUMPC(st->record[j].count, st->record[j].num, st->shift);
820 // shift down higher records
821 while(++j < VC_CONTAINER_STATS_BINS)
822 st->record[j] = st->record[j+1];
824 // zero the free top record
825 st->record[VC_CONTAINER_STATS_BINS].count = 0;
826 st->record[VC_CONTAINER_STATS_BINS].num = 0;
827 st->record[VC_CONTAINER_STATS_BINS].numpc = 0;
832 typedef struct VC_CONTAINER_IO_ASYNC_T
834 VC_CONTAINER_IO_T *io;
835 VCOS_THREAD_T thread;
836 VCOS_SEMAPHORE_T spare_sema;
837 VCOS_SEMAPHORE_T queue_sema;
838 VCOS_EVENT_T wake_event;
841 unsigned int num_area;
842 uint8_t *mem[MAX_NUM_MEMORY_AREAS]; /**< Base address of memory areas */
843 uint8_t *buffer[MAX_NUM_MEMORY_AREAS]; /**< When queued for writing, pointer to start of valid cache area */
844 size_t size[MAX_NUM_MEMORY_AREAS]; /**< When queued for writing, size of valid area to write */
845 unsigned int cur_area;
847 unsigned char stack[3000];
851 VC_CONTAINER_WRITE_STATS_T stats;
853 } VC_CONTAINER_IO_ASYNC_T;
855 /*****************************************************************************/
856 static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable )
858 ctx->stats_enable = enable;
859 stats_initialise(&ctx->stats.write, 8);
860 stats_initialise(&ctx->stats.wait, 0);
861 stats_initialise(&ctx->stats.flush, 0);
864 static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats )
869 static void *async_io_thread(VOID *argv)
871 VC_CONTAINER_IO_ASYNC_T *ctx = argv;
872 unsigned int write_area = 0;
876 unsigned long time = 0;
878 vcos_event_wait(&ctx->wake_event);
881 while(vcos_semaphore_trywait(&ctx->queue_sema) == VCOS_SUCCESS)
883 uint8_t *buffer = ctx->buffer[write_area];
884 size_t size = ctx->size[write_area];
886 if(ctx->stats_enable)
887 time = vcos_getmicrosecs();
889 if(ctx->io->pf_write(ctx->io, buffer, size) != size)
892 if(ctx->stats_enable)
893 stats_add_value(&ctx->stats.write, size, vcos_getmicrosecs() - time);
895 /* Signal that the write is done */
896 vcos_semaphore_post(&ctx->spare_sema);
898 if(++write_area == ctx->num_area)
906 static int async_io_write( VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
908 unsigned long time = 0;
911 if(ctx->stats_enable)
912 time = vcos_getmicrosecs();
914 /* post the current area */
915 ctx->buffer[ctx->cur_area] = cache->buffer;
916 ctx->size[ctx->cur_area] = cache->size;
917 vcos_semaphore_post(&ctx->queue_sema);
918 vcos_event_signal(&ctx->wake_event);
920 /* now we need to grab another area */
921 vcos_semaphore_wait(&ctx->spare_sema);
922 if(++ctx->cur_area == ctx->num_area)
925 if(ctx->stats_enable)
926 stats_add_value(&ctx->stats.wait, 1, vcos_getmicrosecs() - time);
928 /* alter cache mem to point to the new cur_area */
929 offset = cache->buffer - cache->mem;
930 cache->mem = ctx->mem[ctx->cur_area];
931 cache->buffer = cache->mem + offset;
932 cache->buffer_end = cache->mem + cache->mem_size;
934 return ctx->error ? 0 : cache->size;
937 static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
938 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
940 unsigned int time = 0;
942 if(ctx->stats_enable)
943 time = vcos_getmicrosecs();
948 /* Need to make sure that all memory areas have been written out, so should have num-1 spare */
949 for(num=0; num<ctx->num_area-1; num++)
950 vcos_semaphore_wait(&ctx->spare_sema);
952 for(num=0; num<ctx->num_area-1; num++)
953 vcos_semaphore_post(&ctx->spare_sema);
957 /* Need to make sure we can acquire one memory area */
958 vcos_semaphore_wait(&ctx->spare_sema);
959 vcos_semaphore_post(&ctx->spare_sema);
962 if(ctx->stats_enable)
963 stats_add_value(&ctx->stats.flush, 1, vcos_getmicrosecs() - time);
965 return ctx->error ? VC_CONTAINER_ERROR_FAILED : VC_CONTAINER_SUCCESS;
968 static VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T *status )
970 VC_CONTAINER_IO_ASYNC_T *ctx = 0;
971 VCOS_UNSIGNED pri = 0;
973 /* Allocate our context */
974 ctx = malloc(sizeof(*ctx));
975 if(!ctx) goto error_spare_sema;
976 memset(ctx, 0, sizeof(*ctx));
979 ctx->mem[0] = io->priv->cache->mem;
981 for(ctx->num_area = 1; ctx->num_area < num_areas; ctx->num_area++)
983 ctx->mem[ctx->num_area] = malloc(io->priv->cache->mem_size);
984 if(!ctx->mem[ctx->num_area])
988 if(ctx->num_area == 1) // no real benefit in asynchronous writes
989 goto error_spare_sema;
991 async_io_stats_initialise(ctx, 0);
993 if(vcos_semaphore_create(&ctx->spare_sema, "async_spare_sem", ctx->num_area-1) != VCOS_SUCCESS)
994 goto error_spare_sema;
996 if(vcos_semaphore_create(&ctx->queue_sema, "async_queue_sem", 0) != VCOS_SUCCESS)
997 goto error_queue_sema;
999 if (vcos_event_create(&ctx->wake_event, "async_wake_event") != VCOS_SUCCESS)
1002 // run this thread at a slightly higher priority than the calling thread - that means that
1003 // we prefer to write to the SD card rather than filling the memory buffer.
1004 pri = vcos_thread_get_priority(vcos_thread_current());
1005 if(vcos_thread_create_classic(&ctx->thread, "async_io", async_io_thread, ctx,
1006 ctx->stack, sizeof(ctx->stack), pri-1, 10, VCOS_START) != VCOS_SUCCESS)
1009 if(status) *status = VC_CONTAINER_SUCCESS;
1013 vcos_event_delete(&ctx->wake_event);
1015 vcos_semaphore_delete(&ctx->queue_sema);
1017 vcos_semaphore_delete(&ctx->spare_sema);
1020 if(status) *status = VC_CONTAINER_ERROR_FAILED;
1024 static VC_CONTAINER_STATUS_T async_io_stop( VC_CONTAINER_IO_ASYNC_T *ctx )
1026 /* Block if a write operation is already in progress */
1027 //vcos_semaphore_wait(&ctx->sema);
1028 // XXX block until all done
1031 vcos_event_signal(&ctx->wake_event);
1032 vcos_thread_join(&ctx->thread,NULL);
1033 vcos_event_delete(&ctx->wake_event);
1034 vcos_semaphore_delete(&ctx->queue_sema);
1035 vcos_semaphore_delete(&ctx->spare_sema);
1037 while(ctx->num_area > 0)
1038 free(ctx->mem[--ctx->num_area]);
1041 return VC_CONTAINER_SUCCESS;
1045 static struct VC_CONTAINER_IO_ASYNC_T *async_io_start( VC_CONTAINER_IO_T *io, int num_areas, VC_CONTAINER_STATUS_T *status )
1047 VC_CONTAINER_PARAM_UNUSED(io);
1048 VC_CONTAINER_PARAM_UNUSED(num_areas);
1049 if(status) *status = VC_CONTAINER_ERROR_FAILED;
1053 static int async_io_write( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_IO_PRIVATE_CACHE_T *cache )
1055 VC_CONTAINER_PARAM_UNUSED(ctx);
1056 VC_CONTAINER_PARAM_UNUSED(cache);
1060 static VC_CONTAINER_STATUS_T async_io_wait_complete( struct VC_CONTAINER_IO_ASYNC_T *ctx,
1061 VC_CONTAINER_IO_PRIVATE_CACHE_T *cache, int complete )
1063 VC_CONTAINER_PARAM_UNUSED(ctx);
1064 VC_CONTAINER_PARAM_UNUSED(cache);
1065 VC_CONTAINER_PARAM_UNUSED(complete);
1069 static VC_CONTAINER_STATUS_T async_io_stop( struct VC_CONTAINER_IO_ASYNC_T *ctx )
1071 VC_CONTAINER_PARAM_UNUSED(ctx);
1072 return VC_CONTAINER_SUCCESS;
1075 static void async_io_stats_initialise( struct VC_CONTAINER_IO_ASYNC_T *ctx, int enable )
1077 VC_CONTAINER_PARAM_UNUSED(ctx);
1078 VC_CONTAINER_PARAM_UNUSED(enable);
1081 static void async_io_stats_get( struct VC_CONTAINER_IO_ASYNC_T *ctx, VC_CONTAINER_WRITE_STATS_T *stats )
1083 VC_CONTAINER_PARAM_UNUSED(ctx);
1084 VC_CONTAINER_PARAM_UNUSED(stats);