From: Anton Khirnov Date: Sat, 2 Jun 2012 05:26:41 +0000 (+0200) Subject: avconv: multithreaded demuxing. X-Git-Tag: v9_beta1~1489 X-Git-Url: http://review.tizen.org/git/?a=commitdiff_plain;h=5db5169e46a5f1676aafb82ec8c3f5dc6fb6bb6d;p=platform%2Fupstream%2Flibav.git avconv: multithreaded demuxing. When there are multiple input files, run demuxing for each input file in a separate thread, so reading packets does not block. This is useful for achieving low latency when reading from multiple (possibly slow) input streams. --- diff --git a/avconv.c b/avconv.c index 23ee164..7d50d58 100644 --- a/avconv.c +++ b/avconv.c @@ -69,6 +69,14 @@ #include #endif +#if HAVE_THREADS +#if HAVE_PTHREADS +#include +#else +#include "libavcodec/w32pthreads.h" +#endif +#endif + #include #include "cmdutils.h" @@ -140,6 +148,11 @@ static float dts_delta_threshold = 10; static int print_stats = 1; +#if HAVE_THREADS +/* signal to input threads that they should exit; set by the main thread */ +static int transcoding_finished; +#endif + #define DEFAULT_PASS_LOGFILENAME_PREFIX "av2pass" typedef struct InputFilter { @@ -219,6 +232,15 @@ typedef struct InputFile { int nb_streams; /* number of stream that avconv is aware of; may be different from ctx.nb_streams if new streams appear during av_read_frame() */ int rate_emu; + +#if HAVE_THREADS + pthread_t thread; /* thread reading from this file */ + int finished; /* the thread has exited */ + int joined; /* the thread has been joined */ + pthread_mutex_t fifo_lock; /* lock for access to fifo */ + pthread_cond_t fifo_cond; /* the main thread will signal on this cond after reading from fifo */ + AVFifoBuffer *fifo; /* demuxed packets are stored here; freed by the main thread */ +#endif } InputFile; typedef struct OutputStream { @@ -2765,6 +2787,125 @@ static int select_input_file(uint8_t *no_packet) return file_index; } +#if HAVE_THREADS +static void *input_thread(void *arg) +{ + InputFile *f = arg; + int ret = 0; + + while (!transcoding_finished && ret >= 0) { + AVPacket pkt; + ret = av_read_frame(f->ctx, &pkt); + + if (ret == AVERROR(EAGAIN)) { + usleep(10000); + ret = 0; + continue; + } else if (ret < 0) + break; + + pthread_mutex_lock(&f->fifo_lock); + while (!av_fifo_space(f->fifo)) + pthread_cond_wait(&f->fifo_cond, &f->fifo_lock); + + av_dup_packet(&pkt); + av_fifo_generic_write(f->fifo, &pkt, sizeof(pkt), NULL); + + pthread_mutex_unlock(&f->fifo_lock); + } + + f->finished = 1; + return NULL; +} + +static void free_input_threads(void) +{ + int i; + + if (nb_input_files == 1) + return; + + transcoding_finished = 1; + + for (i = 0; i < nb_input_files; i++) { + InputFile *f = input_files[i]; + AVPacket pkt; + + if (f->joined) + continue; + + pthread_mutex_lock(&f->fifo_lock); + while (av_fifo_size(f->fifo)) { + av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_free_packet(&pkt); + } + pthread_cond_signal(&f->fifo_cond); + pthread_mutex_unlock(&f->fifo_lock); + + pthread_join(f->thread, NULL); + f->joined = 1; + + while (av_fifo_size(f->fifo)) { + av_fifo_generic_read(f->fifo, &pkt, sizeof(pkt), NULL); + av_free_packet(&pkt); + } + av_fifo_free(f->fifo); + } +} + +static int init_input_threads(void) +{ + int i, ret; + + if (nb_input_files == 1) + return 0; + + for (i = 0; i < nb_input_files; i++) { + InputFile *f = input_files[i]; + + if (!(f->fifo = av_fifo_alloc(8*sizeof(AVPacket)))) + return AVERROR(ENOMEM); + + pthread_mutex_init(&f->fifo_lock, NULL); + pthread_cond_init (&f->fifo_cond, NULL); + + if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) + return AVERROR(ret); + } + return 0; +} + +static int get_input_packet_mt(InputFile *f, AVPacket *pkt) +{ + int ret = 0; + + pthread_mutex_lock(&f->fifo_lock); + + if (av_fifo_size(f->fifo)) { + av_fifo_generic_read(f->fifo, pkt, sizeof(*pkt), NULL); + pthread_cond_signal(&f->fifo_cond); + } else { + if (f->finished) + ret = AVERROR_EOF; + else + ret = AVERROR(EAGAIN); + } + + pthread_mutex_unlock(&f->fifo_lock); + + return ret; +} +#endif + +static int get_input_packet(InputFile *f, AVPacket *pkt) +{ +#if HAVE_THREADS + if (nb_input_files > 1) + return get_input_packet_mt(f, pkt); +#endif + return av_read_frame(f->ctx, pkt); +} + /* * The following code is the main loop of the file converter */ @@ -2790,6 +2931,11 @@ static int transcode(void) timer_start = av_gettime(); +#if HAVE_THREADS + if ((ret = init_input_threads()) < 0) + goto fail; +#endif + for (; received_sigterm == 0;) { int file_index, ist_index; AVPacket pkt; @@ -2810,12 +2956,13 @@ static int transcode(void) usleep(10000); continue; } + av_log(NULL, AV_LOG_VERBOSE, "No more inputs to read from, finishing.\n"); break; } - /* read a frame from it and output it in the fifo */ is = input_files[file_index]->ctx; - ret = av_read_frame(is, &pkt); + ret = get_input_packet(input_files[file_index], &pkt); + if (ret == AVERROR(EAGAIN)) { no_packet[file_index] = 1; no_packet_count++; @@ -2897,6 +3044,9 @@ static int transcode(void) /* dump report by using the output first video and audio streams */ print_report(0, timer_start); } +#if HAVE_THREADS + free_input_threads(); +#endif /* at the end of stream, we must flush the decoder buffers */ for (i = 0; i < nb_input_streams; i++) { @@ -2941,6 +3091,9 @@ static int transcode(void) fail: av_freep(&no_packet); +#if HAVE_THREADS + free_input_threads(); +#endif if (output_streams) { for (i = 0; i < nb_output_streams; i++) {