From a8b13468572d06fe32c9a08e3b583c160a9fea2a Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 20 Dec 2001 02:41:34 +0000 Subject: [PATCH] cothread cleanup. Original commit message from CVS: cothread cleanup. - removed some old code. - ran the thing through indent - rename cothread_init/free to cothread_context_init/free - implement cothread_free/destroy to clear the cothread_state slot - make cothread_init reuse empty slots. minor cleanups in error reporting (gstpad, gstelement) code cleanup in gstthread make unexpected state changes in gstthread less fatal free the cothread_state in the scheduler. This one passes all the current cvs testcases including dynamic-pipeline. --- gst/cothreads.c | 306 +++++++++++++++++++++---------------- gst/cothreads.h | 7 +- gst/gstelement.c | 2 +- gst/gstpad.c | 37 +++-- gst/gstqueue.c | 2 + gst/gstthread.c | 36 +++-- gst/schedulers/gstbasicscheduler.c | 19 ++- plugins/elements/gstqueue.c | 2 + 8 files changed, 236 insertions(+), 175 deletions(-) diff --git a/gst/cothreads.c b/gst/cothreads.c index db029a3..eae2012 100644 --- a/gst/cothreads.c +++ b/gst/cothreads.c @@ -21,19 +21,18 @@ */ #include -#include +#include #include -#include +#include #include #include #include -/* we make too much noise for normal debugging... */ -/* #define GST_DEBUG_FORCE_DISABLE */ #include "gst_private.h" #include "cothreads.h" #include "gstarch.h" +#include "gstlog.h" #define STACK_SIZE 0x200000 @@ -41,7 +40,8 @@ #define COTHREAD_MAXTHREADS 16 #define COTHREAD_STACKSIZE (STACK_SIZE/COTHREAD_MAXTHREADS) -struct _cothread_context { +struct _cothread_context +{ cothread_state *threads[COTHREAD_MAXTHREADS]; int nthreads; int current; @@ -49,79 +49,82 @@ struct _cothread_context { }; -pthread_key_t _cothread_key = -1; +static pthread_key_t _cothread_key = -1; /* Disablig this define allows you to shut off a few checks in * cothread_switch. This likely will speed things up fractionally */ #define COTHREAD_PARANOID /** - * cothread_init: + * cothread_context_init: * * Create and initialize a new cothread context * * Returns: the new cothread context */ -cothread_context* -cothread_init (void) +cothread_context * +cothread_context_init (void) { - cothread_context *ctx = (cothread_context *)malloc(sizeof(cothread_context)); + cothread_context *ctx = (cothread_context *) g_malloc (sizeof (cothread_context)); /* we consider the initiating process to be cothread 0 */ ctx->nthreads = 1; ctx->current = 0; - ctx->data = g_hash_table_new(g_str_hash, g_str_equal); + ctx->data = g_hash_table_new (g_str_hash, g_str_equal); - GST_INFO (GST_CAT_COTHREADS,"initializing cothreads"); + GST_INFO (GST_CAT_COTHREADS, "initializing cothreads"); if (_cothread_key == -1) { - if (pthread_key_create (&_cothread_key,NULL) != 0) { + if (pthread_key_create (&_cothread_key, NULL) != 0) { perror ("pthread_key_create"); return NULL; } } - pthread_setspecific (_cothread_key,ctx); + pthread_setspecific (_cothread_key, ctx); - memset (ctx->threads,0,sizeof(ctx->threads)); + memset (ctx->threads, 0, sizeof (ctx->threads)); - ctx->threads[0] = (cothread_state *)malloc(sizeof(cothread_state)); + ctx->threads[0] = (cothread_state *) g_malloc (sizeof (cothread_state)); ctx->threads[0]->ctx = ctx; ctx->threads[0]->threadnum = 0; ctx->threads[0]->func = NULL; ctx->threads[0]->argc = 0; ctx->threads[0]->argv = NULL; ctx->threads[0]->flags = COTHREAD_STARTED; - ctx->threads[0]->sp = (void *)CURRENT_STACK_FRAME; + ctx->threads[0]->sp = (void *) CURRENT_STACK_FRAME; ctx->threads[0]->pc = 0; /* initialize the lock */ #ifdef COTHREAD_ATOMIC atomic_set (&ctx->threads[0]->lock, 0); #else - ctx->threads[0]->lock = g_mutex_new(); + ctx->threads[0]->lock = g_mutex_new (); #endif - GST_INFO (GST_CAT_COTHREADS,"0th thread is %p at sp:%p",ctx->threads[0], ctx->threads[0]->sp); + GST_INFO (GST_CAT_COTHREADS, "0th thread is %p at sp:%p", ctx->threads[0], ctx->threads[0]->sp); return ctx; } +/** + * cothread_context_free: + * + * Free the cothread context. + */ void -cothread_free (cothread_context *ctx) +cothread_context_free (cothread_context *ctx) { gint i; - for (i=0; inthreads; i++) { + for (i = 0; i < ctx->nthreads; i++) { #ifndef COTHREAD_ATOMIC - if (ctx->threads[i]->lock) { - g_mutex_unlock(ctx->threads[i]->lock); + if (ctx->threads[i]) { + g_mutex_unlock (ctx->threads[i]->lock); g_mutex_free (ctx->threads[i]->lock); - ctx->threads[i]->lock = NULL; } #endif if (i == 0) { g_free (ctx->threads[i]); - ctx->threads[i] = NULL; } } g_hash_table_destroy (ctx->data); @@ -137,57 +140,94 @@ cothread_free (cothread_context *ctx) * Returns: the new cothread state or NULL on error */ cothread_state* -cothread_create (cothread_context *ctx) +cothread_create (cothread_context *ctx) { - cothread_state *s; + cothread_state *thread; + void *sp; + guchar *stack_end; + gint slot = 0; + + g_return_val_if_fail (ctx != NULL, NULL); if (ctx->nthreads == COTHREAD_MAXTHREADS) { - GST_DEBUG (0, "attempt to create > COTHREAD_MAXTHREADS\n"); + /* this is pretty fatal */ + g_warning ("cothread_create: attempt to create > COTHREAD_MAXTHREADS\n"); return NULL; } - GST_DEBUG (0,"pthread_self() %ld\n",pthread_self()); - /* if (0) { */ - if (pthread_self() == 0) { /* FIXME uh, what does this test really do? */ - s = (cothread_state *)malloc(COTHREAD_STACKSIZE); - GST_DEBUG (0,"new stack (case 1) at %p\n",s); - } else { - void *sp = CURRENT_STACK_FRAME; - /* FIXME this may not be 64bit clean - * could use casts to uintptr_t from inttypes.h - * if only all platforms had inttypes.h - */ - guchar *stack_end = (guchar *)((unsigned long)sp & ~(STACK_SIZE - 1)); - s = (cothread_state *)(stack_end + ((ctx->nthreads - 1) * - COTHREAD_STACKSIZE)); - GST_DEBUG (0,"new stack (case 2) at %p\n",s); - if (mmap((void *)s,COTHREAD_STACKSIZE, - PROT_READ|PROT_WRITE|PROT_EXEC,MAP_FIXED|MAP_PRIVATE|MAP_ANON, - -1,0) < 0) { - perror("mmap'ing cothread stack space"); - return NULL; - } + /* find a free spot in the stack, note slot 1 has the main thread */ + for (slot = 1; slot < ctx->nthreads; slot++) { + if (ctx->threads[slot] == NULL) + break; + } + + sp = CURRENT_STACK_FRAME; + /* FIXME this may not be 64bit clean + * could use casts to uintptr_t from inttypes.h + * if only all platforms had inttypes.h + */ + stack_end = (guchar *) ((gulong) sp & ~(STACK_SIZE - 1)); + + thread = (cothread_state *) (stack_end + ((slot - 1) * COTHREAD_STACKSIZE)); + GST_DEBUG (0, "new stack at %p\n", thread); + + if (mmap ((void *) thread, COTHREAD_STACKSIZE, + PROT_READ | PROT_WRITE | PROT_EXEC, MAP_FIXED | MAP_PRIVATE | MAP_ANON, -1, 0) < 0) { + perror ("mmap'ing cothread stack space"); + return NULL; } - s->ctx = ctx; - s->threadnum = ctx->nthreads; - s->flags = 0; - s->sp = ((guchar *)s + COTHREAD_STACKSIZE); - /* is this needed anymore? */ - s->top_sp = s->sp; + thread->ctx = ctx; + thread->threadnum = slot; + thread->flags = 0; + thread->sp = ((guchar *) thread + COTHREAD_STACKSIZE); + thread->top_sp = thread->sp; /* for debugging purposes to detect stack overruns */ /* initialize the lock */ #ifdef COTHREAD_ATOMIC - atomic_set (s->lock, 0); + atomic_set (thread->lock, 0); #else - s->lock = g_mutex_new(); + thread->lock = g_mutex_new (); #endif - GST_INFO (GST_CAT_COTHREADS,"created cothread #%d: %p at sp:%p lock:%p", ctx->nthreads, - s, s->sp, s->lock); + GST_INFO (GST_CAT_COTHREADS, "created cothread #%d in slot %d: %p at sp:%p lock:%p", + ctx->nthreads, slot, thread, thread->sp, thread->lock); + + ctx->threads[slot] = thread; + ctx->nthreads++; + + return thread; +} + +/** + * cothread_free: + * @thread: the cothread state + * + * Free the given cothread state + */ +void +cothread_free (cothread_state *thread) +{ + g_return_if_fail (thread != NULL); + + /* we simply flag the cothread for destruction here */ + thread->flags |= COTHREAD_DESTROYED; +} + +static void +cothread_destroy (cothread_state *thread) +{ + cothread_context *ctx; + + g_return_if_fail (thread != NULL); - ctx->threads[ctx->nthreads++] = s; + ctx = thread->ctx; +#ifndef COTHREAD_ATOMIC + g_mutex_free (thread->lock); +#endif + //munmap ((void *) thread, COTHREAD_STACKSIZE); - return s; + ctx->threads[thread->threadnum] = NULL; + ctx->nthreads--; } /** @@ -199,16 +239,13 @@ cothread_create (cothread_context *ctx) * * Set the cothread function */ -void -cothread_setfunc (cothread_state *thread, - cothread_func func, - int argc, - char **argv) +void +cothread_setfunc (cothread_state * thread, cothread_func func, int argc, char **argv) { thread->func = func; thread->argc = argc; thread->argv = argv; - thread->pc = (void *)func; + thread->pc = (void *) func; } /** @@ -219,10 +256,10 @@ cothread_setfunc (cothread_state *thread, * * Returns: the #cothread_state of the main (0th) thread */ -cothread_state* -cothread_main (cothread_context *ctx) +cothread_state * +cothread_main (cothread_context * ctx) { - GST_DEBUG (0,"returning %p, the 0th cothread\n",ctx->threads[0]); + GST_DEBUG (0, "returning %p, the 0th cothread\n", ctx->threads[0]); return ctx->threads[0]; } @@ -233,20 +270,21 @@ cothread_main (cothread_context *ctx) * * Returns: the #cothread_state of the main (0th) thread in the current pthread */ -cothread_state* +cothread_state * cothread_current_main (void) { - cothread_context *ctx = pthread_getspecific(_cothread_key); + cothread_context *ctx = pthread_getspecific (_cothread_key); + return ctx->threads[0]; } -static void -cothread_stub (void) +static void +cothread_stub (void) { - cothread_context *ctx = pthread_getspecific(_cothread_key); + cothread_context *ctx = pthread_getspecific (_cothread_key); register cothread_state *thread = ctx->threads[ctx->current]; - GST_DEBUG_ENTER(""); + GST_DEBUG_ENTER (""); thread->flags |= COTHREAD_STARTED; /* #ifdef COTHREAD_ATOMIC @@ -255,16 +293,16 @@ cothread_stub (void) * g_mutex_lock(thread->lock); * #endif */ - while (1) { - thread->func(thread->argc,thread->argv); + while (TRUE) { + thread->func (thread->argc, thread->argv); /* we do this to avoid ever returning, we just switch to 0th thread */ - cothread_switch(cothread_main(ctx)); + cothread_switch (cothread_main (ctx)); } thread->flags &= ~COTHREAD_STARTED; thread->pc = 0; thread->sp = thread->top_sp; - fprintf(stderr,"uh, yeah, we shouldn't be here, but we should deal anyway\n"); - GST_DEBUG_LEAVE(""); + fprintf (stderr, "uh, yeah, we shouldn't be here, but we should deal anyway\n"); + GST_DEBUG_LEAVE (""); } /** @@ -275,11 +313,13 @@ cothread_stub (void) * Returns: the current cothread id */ int cothread_getcurrent (void) __attribute__ ((no_instrument_function)); -int -cothread_getcurrent (void) +int +cothread_getcurrent (void) { - cothread_context *ctx = pthread_getspecific(_cothread_key); - if (!ctx) return -1; + cothread_context *ctx = pthread_getspecific (_cothread_key); + + if (!ctx) + return -1; return ctx->current; } @@ -292,13 +332,11 @@ cothread_getcurrent (void) * adds data to a cothread */ void -cothread_set_data (cothread_state *thread, - gchar *key, - gpointer data) +cothread_set_data (cothread_state * thread, gchar * key, gpointer data) { - cothread_context *ctx = pthread_getspecific(_cothread_key); + cothread_context *ctx = pthread_getspecific (_cothread_key); - g_hash_table_insert(ctx->data, key, data); + g_hash_table_insert (ctx->data, key, data); } /** @@ -311,12 +349,11 @@ cothread_set_data (cothread_state *thread, * Returns: the data assiciated with the key */ gpointer -cothread_get_data (cothread_state *thread, - gchar *key) +cothread_get_data (cothread_state * thread, gchar * key) { - cothread_context *ctx = pthread_getspecific(_cothread_key); + cothread_context *ctx = pthread_getspecific (_cothread_key); - return g_hash_table_lookup(ctx->data, key); + return g_hash_table_lookup (ctx->data, key); } /** @@ -325,72 +362,80 @@ cothread_get_data (cothread_state *thread, * * Switches to the given cothread state */ -void -cothread_switch (cothread_state *thread) +void +cothread_switch (cothread_state * thread) { cothread_context *ctx; cothread_state *current; int enter; #ifdef COTHREAD_PARANOID - if (thread == NULL) goto nothread; + if (thread == NULL) + goto nothread; #endif ctx = thread->ctx; #ifdef COTHREAD_PARANOID - if (ctx == NULL) goto nocontext; + if (ctx == NULL) + goto nocontext; #endif current = ctx->threads[ctx->current]; #ifdef COTHREAD_PARANOID - if (current == NULL) goto nocurrent; + if (current == NULL) + goto nocurrent; #endif - if (current == thread) goto selfswitch; + if (current == thread) + goto selfswitch; /* unlock the current thread, we're out of that context now */ #ifdef COTHREAD_ATOMIC /* do something to unlock the cothread */ #else - g_mutex_unlock(current->lock); + g_mutex_unlock (current->lock); #endif /* lock the next cothread before we even switch to it */ #ifdef COTHREAD_ATOMIC /* do something to lock the cothread */ #else - g_mutex_lock(thread->lock); + g_mutex_lock (thread->lock); #endif /* find the number of the thread to switch to */ - GST_INFO (GST_CAT_COTHREAD_SWITCH,"switching from cothread #%d to cothread #%d", - ctx->current,thread->threadnum); + GST_INFO (GST_CAT_COTHREAD_SWITCH, "switching from cothread #%d to cothread #%d", + ctx->current, thread->threadnum); ctx->current = thread->threadnum; /* save the current stack pointer, frame pointer, and pc */ #ifdef GST_ARCH_PRESETJMP - GST_ARCH_PRESETJMP(); + GST_ARCH_PRESETJMP (); #endif - enter = sigsetjmp(current->jmp, 1); + enter = sigsetjmp (current->jmp, 1); if (enter != 0) { - GST_DEBUG (0,"enter thread #%d %d %p<->%p (%d)\n",current->threadnum, enter, - current->sp, current->top_sp, current->top_sp-current->sp); + GST_DEBUG (0, "enter thread #%d %d %p<->%p (%d)\n", current->threadnum, enter, + current->sp, current->top_sp, current->top_sp - current->sp); return; } - GST_DEBUG (0,"exit thread #%d %d %p<->%p (%d)\n",current->threadnum, enter, - current->sp, current->top_sp, current->top_sp-current->sp); + GST_DEBUG (0, "exit thread #%d %d %p<->%p (%d)\n", current->threadnum, enter, + current->sp, current->top_sp, current->top_sp - current->sp); enter = 1; - GST_DEBUG (0,"set stack to %p\n", thread->sp); + if (current->flags & COTHREAD_DESTROYED) + cothread_destroy (current); + + GST_DEBUG (0, "set stack to %p\n", thread->sp); /* restore stack pointer and other stuff of new cothread */ if (thread->flags & COTHREAD_STARTED) { - GST_DEBUG (0,"in thread \n"); + GST_DEBUG (0, "in thread \n"); /* switch to it */ - siglongjmp(thread->jmp,1); - } else { - GST_ARCH_SETUP_STACK(thread->sp); - GST_ARCH_SET_SP(thread->sp); + siglongjmp (thread->jmp, 1); + } + else { + GST_ARCH_SETUP_STACK (thread->sp); + GST_ARCH_SET_SP (thread->sp); /* start it */ - GST_ARCH_CALL(cothread_stub); - GST_DEBUG (0,"exit thread \n"); + GST_ARCH_CALL (cothread_stub); + GST_DEBUG (0, "exit thread \n"); ctx->current = 0; } @@ -398,17 +443,17 @@ cothread_switch (cothread_state *thread) #ifdef COTHREAD_PARANOID nothread: - g_print("cothread: can't switch to NULL cothread!\n"); + g_print ("cothread: can't switch to NULL cothread!\n"); return; nocontext: - g_print("cothread: there's no context, help!\n"); - exit(2); + g_print ("cothread: there's no context, help!\n"); + exit (2); nocurrent: - g_print("cothread: there's no current thread, help!\n"); - exit(2); + g_print ("cothread: there's no current thread, help!\n"); + exit (2); #endif /* COTHREAD_PARANOID */ selfswitch: - g_print("cothread: trying to switch to same thread, legal but not necessary\n"); + g_print ("cothread: trying to switch to same thread, legal but not necessary\n"); return; } @@ -419,13 +464,13 @@ selfswitch: * Locks the cothread state. */ void -cothread_lock (cothread_state *thread) +cothread_lock (cothread_state * thread) { #ifdef COTHREAD_ATOMIC /* do something to lock the cothread */ #else if (thread->lock) - g_mutex_lock(thread->lock); + g_mutex_lock (thread->lock); #endif } @@ -438,13 +483,13 @@ cothread_lock (cothread_state *thread) * Returns: TRUE if the cothread could be locked. */ gboolean -cothread_trylock (cothread_state *thread) +cothread_trylock (cothread_state * thread) { #ifdef COTHREAD_ATOMIC /* do something to try to lock the cothread */ #else if (thread->lock) - return g_mutex_trylock(thread->lock); + return g_mutex_trylock (thread->lock); else return FALSE; #endif @@ -457,13 +502,12 @@ cothread_trylock (cothread_state *thread) * Unlock the cothread state. */ void -cothread_unlock (cothread_state *thread) +cothread_unlock (cothread_state * thread) { #ifdef COTHREAD_ATOMIC /* do something to unlock the cothread */ #else if (thread->lock) - g_mutex_unlock(thread->lock); + g_mutex_unlock (thread->lock); #endif } - diff --git a/gst/cothreads.h b/gst/cothreads.h index f523bf2..1d45009 100644 --- a/gst/cothreads.h +++ b/gst/cothreads.h @@ -42,6 +42,7 @@ typedef struct _cothread_context cothread_context; typedef int (*cothread_func) (int argc,char **argv); #define COTHREAD_STARTED 0x01 +#define COTHREAD_DESTROYED 0x02 struct _cothread_state { cothread_context *ctx; @@ -66,9 +67,11 @@ struct _cothread_state { }; -cothread_context* cothread_init (void); +cothread_context* cothread_context_init (void); +void cothread_context_free (cothread_context *ctx); + cothread_state* cothread_create (cothread_context *ctx); -void cothread_free (cothread_context *ctx); +void cothread_free (cothread_state *thread); void cothread_setfunc (cothread_state *thread, cothread_func func, int argc, char **argv); int cothread_getcurrent (void); diff --git a/gst/gstelement.c b/gst/gstelement.c index 6be17a4..44cb156 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -975,7 +975,7 @@ gst_element_change_state (GstElement *element) old_state = GST_STATE (element); if (GST_STATE_PENDING (element) == GST_STATE_VOID_PENDING || old_state == GST_STATE_PENDING (element)) { - g_warning ("no state change needed for element %s (VOID_PENDING)\n", GST_ELEMENT_NAME (element)); + GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)\n", GST_ELEMENT_NAME (element)); return GST_STATE_SUCCESS; } diff --git a/gst/gstpad.c b/gst/gstpad.c index 113f190..2c52a5e 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -1451,15 +1451,18 @@ gst_pad_push (GstPad *pad, GstBuffer *buf) GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); g_return_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC); - g_return_if_fail (peer != NULL); + if (!peer) { + g_warning ("gst_pad_push but %s:%s is unconnected", GST_DEBUG_PAD_NAME (pad)); + return; + } if (peer->chainhandler) { GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n", - GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (((GstPad*)peer))); - (peer->chainhandler) (((GstPad*)peer), buf); + GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer))); + (peer->chainhandler) (GST_PAD_FAST (peer), buf); } else { - GST_DEBUG (GST_CAT_DATAFLOW, "no chainhandler\n"); + g_warning ("gst_pad_push but %s:%s has but no chainhandler", GST_DEBUG_PAD_NAME (peer)); } } #endif @@ -1482,21 +1485,17 @@ gst_pad_pull (GstPad *pad) g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL); - if (!peer) - { -/* FIXME: g_critical is glib-2.0, not glib-1.2 - g_critical ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad)); -*/ - g_warning ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad)); - return NULL; - } + if (!peer) { + g_warning ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad)); + return NULL; + } if (peer->gethandler) { - GST_DEBUG (GST_CAT_DATAFLOW,"calling gethandler %s of peer pad %s:%s\n", - GST_DEBUG_FUNCPTR_NAME(peer->gethandler),GST_DEBUG_PAD_NAME(peer)); - return (peer->gethandler)(((GstPad*)peer)); + GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n", + GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer)); + return (peer->gethandler) (GST_PAD_FAST (peer)); } else { - GST_DEBUG (GST_CAT_DATAFLOW,"no gethandler for peer pad %s:%s at %p\n",GST_DEBUG_PAD_NAME(((GstPad*)peer)),&peer->gethandler); + g_warning ("gst_pad_pull but %s:%s has no gethandler", GST_DEBUG_PAD_NAME (peer)); return NULL; } } @@ -1535,9 +1534,9 @@ gst_pad_pullregion (GstPad *pad, GstRegionType type, guint64 offset, guint64 len GST_DEBUG_ENTER("(%s:%s,%d,%lld,%lld)",GST_DEBUG_PAD_NAME(pad),type,offset,len); if (peer->pullregionfunc) { - GST_DEBUG (GST_CAT_DATAFLOW,"calling pullregionfunc &%s of peer pad %s:%s\n", - GST_DEBUG_FUNCPTR_NAME(peer->pullregionfunc),GST_DEBUG_PAD_NAME(((GstPad*)peer))); - result = (peer->pullregionfunc)(((GstPad*)peer),type,offset,len); + GST_DEBUG (GST_CAT_DATAFLOW, "calling pullregionfunc &%s of peer pad %s:%s\n", + GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_FAST (peer))); + result = (peer->pullregionfunc) (GST_PAD_FAST (peer), type, offset, len); } else { GST_DEBUG (GST_CAT_DATAFLOW,"no pullregionfunc\n"); result = NULL; diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 4561fa6..dac68ea 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -332,6 +332,7 @@ restart: /* if there's a pending state change for this queue or its manager, switch */ /* back to iterator so bottom half of state change executes */ while (GST_STATE (queue) != GST_STATE_PLAYING) { + //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); @@ -400,6 +401,7 @@ restart: * back to iterator so bottom half of state change executes */ while (GST_STATE (queue) != GST_STATE_PLAYING) { + //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); diff --git a/gst/gstthread.c b/gst/gstthread.c index a11bcfe..e6a8cd5 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -318,31 +318,29 @@ gst_thread_change_state (GstElement * element) GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); while (elements) { - GstElement *e = GST_ELEMENT (elements->data); + GstElement *element = GST_ELEMENT (elements->data); - g_assert (e); - THR_DEBUG (" element \"%s\"\n", GST_ELEMENT_NAME (e)); + g_assert (element); + THR_DEBUG (" element \"%s\"\n", GST_ELEMENT_NAME (element)); elements = g_list_next (elements); - if (GST_IS_QUEUE (e)) { + if (GST_IS_QUEUE (element)) { /* FIXME make this more efficient by only waking queues that are asleep * FIXME and only waking the appropriate condition (depending on if it's * FIXME on up- or down-stream side) * FIXME also make this more efficient by keeping list of managed queues */ - THR_DEBUG ("waking queue \"%s\"\n", GST_ELEMENT_NAME (e)); - gst_element_set_state (e, GST_STATE_PAUSED); + THR_DEBUG ("waking queue \"%s\"\n", GST_ELEMENT_NAME (element)); + gst_element_set_state (element, GST_STATE_PAUSED); } else { - GList *pads = GST_ELEMENT_PADS (e); + GList *pads = GST_ELEMENT_PADS (element); while (pads) { - GstRealPad *peer; + GstRealPad *peer = GST_PAD_PEER (pads->data); GstElement *peerelement; - GstPad *p = GST_PAD (pads->data); pads = g_list_next (pads); - peer = GST_PAD_PEER (p); if (!peer) continue; @@ -364,7 +362,7 @@ gst_thread_change_state (GstElement * element) if (GST_ELEMENT_SCHED (peerelement) != GST_ELEMENT_SCHED (thread)) { GstQueue *queue = GST_QUEUE (peerelement); - THR_DEBUG (" element \"%s\" has pad cross sched boundary\n", GST_ELEMENT_NAME (e)); + THR_DEBUG (" element \"%s\" has pad cross sched boundary\n", GST_ELEMENT_NAME (element)); /* FIXME!! */ g_mutex_lock (queue->qlock); g_cond_signal (queue->not_full); @@ -470,8 +468,12 @@ gst_thread_main_loop (void *arg) gst_element_statename (GST_STATE_PAUSED)); g_cond_wait (thread->cond,thread->lock); - g_assert (GST_STATE_PENDING (thread) == GST_STATE_NULL || - GST_STATE_PENDING (thread) == GST_STATE_PAUSED); + /* this must have happened by a state change in the thread context */ + if (GST_STATE_PENDING (thread) != GST_STATE_NULL && + GST_STATE_PENDING (thread) != GST_STATE_PAUSED) { + g_cond_signal (thread->cond); + continue; + } /* been signaled, we need to state transition now and signal back */ gst_thread_update_state (thread); @@ -491,8 +493,12 @@ gst_thread_main_loop (void *arg) gst_element_statename (GST_STATE_PLAYING)); g_cond_wait (thread->cond,thread->lock); - g_assert (GST_STATE_PENDING (thread) == GST_STATE_READY || - GST_STATE_PENDING (thread) == GST_STATE_PLAYING); + /* this must have happened by a state change in the thread context */ + if (GST_STATE_PENDING (thread) != GST_STATE_READY && + GST_STATE_PENDING (thread) != GST_STATE_PLAYING) { + g_cond_signal (thread->cond); + continue; + } /* been signaled, we need to state transition now and signal back */ gst_thread_update_state (thread); diff --git a/gst/schedulers/gstbasicscheduler.c b/gst/schedulers/gstbasicscheduler.c index f465859..9242cd2 100644 --- a/gst/schedulers/gstbasicscheduler.c +++ b/gst/schedulers/gstbasicscheduler.c @@ -622,6 +622,12 @@ gst_basic_scheduler_chain_disable_element (GstSchedulerChain * chain, GstElement /* reschedule the chain */ /* FIXME this should be done only if manager state != NULL */ /* gst_basic_scheduler_cothreaded_chain(GST_BIN(chain->sched->parent),chain); */ + /* FIXME is this right? */ + /* we have to check for a threadstate here because a queue doesn't have one */ + if (element->threadstate) { + cothread_free (element->threadstate); + element->threadstate = NULL; + } } static void @@ -787,7 +793,7 @@ gst_basic_scheduler_setup (GstScheduler *sched) /* first create thread context */ if (bin->threadcontext == NULL) { GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context\n"); - bin->threadcontext = cothread_init (); + bin->threadcontext = cothread_context_init (); } } @@ -805,7 +811,7 @@ gst_basic_scheduler_reset (GstScheduler *sched) ctx = GST_BIN (GST_SCHED_PARENT (sched))->threadcontext; - cothread_free (ctx); + cothread_context_free (ctx); GST_BIN (GST_SCHED_PARENT (sched))->threadcontext = NULL; } @@ -992,6 +998,7 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa chain2 = gst_basic_scheduler_find_chain (sched, element2); if (chain1 != chain2) { + /* elements not in the same chain don't need to be separated */ GST_INFO (GST_CAT_SCHEDULING, "elements not in the same chain"); return; } @@ -1006,10 +1013,7 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa } /* check the other element to see if it landed in the newly created chain */ - if (chain2) { - GST_INFO (GST_CAT_SCHEDULING, "destroying chain"); - gst_basic_scheduler_chain_destroy (chain2); - + if (gst_basic_scheduler_find_chain (sched, element2) == NULL) { /* if not in chain, create chain and build from scratch */ chain2 = gst_basic_scheduler_chain_new (sched); gst_basic_scheduler_chain_recursive_add (chain2, element2); @@ -1076,7 +1080,8 @@ gst_basic_scheduler_iterate (GstScheduler * sched) /* step through all the chains */ chains = sched->chains; - g_return_val_if_fail (chains != NULL, FALSE); + if (chains == NULL) + return FALSE; while (chains) { chain = (GstSchedulerChain *) (chains->data); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 4561fa6..dac68ea 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -332,6 +332,7 @@ restart: /* if there's a pending state change for this queue or its manager, switch */ /* back to iterator so bottom half of state change executes */ while (GST_STATE (queue) != GST_STATE_PLAYING) { + //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); @@ -400,6 +401,7 @@ restart: * back to iterator so bottom half of state change executes */ while (GST_STATE (queue) != GST_STATE_PLAYING) { + //while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); cothread_switch(cothread_current_main()); -- 2.7.4