From 734689b1e8aa3fb47e6824909f50ac8abf7ed09e Mon Sep 17 00:00:00 2001 From: Malcolm Beattie Date: Wed, 23 Apr 1997 19:04:18 +0000 Subject: [PATCH] Rewrote programmer-level condition variables from scratch. Added support for detaching threads. Fixed handling for arguments passed in to threads and return values for joined threads. p4raw-id: //depot/perlext/Thread@7 --- README | 26 ++++--- Thread.pm | 21 ++++-- Thread.xs | 244 ++++++++++++++++++++++++++++++-------------------------------- lock.t | 27 +++++++ typemap | 6 +- 5 files changed, 176 insertions(+), 148 deletions(-) create mode 100644 lock.t diff --git a/README b/README index 32a891c..4205a32 100644 --- a/README +++ b/README @@ -1,19 +1,17 @@ -The file thrpatch-oct1 contains patches against perl5.001m which makes -a first stab at a multithreading perl5. If your version of patch can't -create file from scratch, then you'll need to create an empty thread.h -manually first. Perl itself will need to be built with -DUSE_THREADS -and very probably -DDEBUGGING since I haven't tested it without that -yet. If you're using MIT pthreads or another threads package that -needs pthread_init() to be called, then add -DNEED_PTHREAD_INIT. If -you're not using a threads library that follows the latest POSIX draft, -then you'll probably need to add -DOLD_PTHREADS_API. I haven't tested --DOLD_PTHREADS_API properly yet and I think you may still have to tweak -a couple of the mutex calls to follow the old API. +If your version of patch can't create a file from scratch, then you'll +need to create an empty thread.h manually first. Perl itself will need +to be built with -DUSE_THREADS yet. If you're using MIT pthreads or +another threads package that needs pthread_init() to be called, then +add -DNEED_PTHREAD_INIT. If you're using a threads library that only +follows one of the old POSIX drafts, then you'll probably need to add +-DOLD_PTHREADS_API. I haven't tested -DOLD_PTHREADS_API properly yet +and I think you may still have to tweak a couple of the mutex calls +to follow the old API. -These patches are copyright Malcolm Beattie 1995 and are freely +These patches are copyright Malcolm Beattie 1995-1997 and are freely distributable under your choice of the GNU Public License or the Artistic License (see the main perl distribution). -These are very preliminary patches and although it should be sufficient -to show roughly what's been going on, they're almost certainly not +These are preliminary patches and although it should be sufficient +to show roughly what's been going on, they're probably not going to produce a perl of any practical use yet. diff --git a/Thread.pm b/Thread.pm index 9ea8cd8..d2f2d8b 100644 --- a/Thread.pm +++ b/Thread.pm @@ -2,19 +2,28 @@ package Thread; require Exporter; require DynaLoader; @ISA = qw(Exporter DynaLoader); -@EXPORT_OK = qw(sync fast yield); +@EXPORT_OK = qw(sync fast yield cond_signal cond_broadcast cond_wait + async); + +# +# Methods +# + +# +# Exported functions +# +sub async (&) { + return new Thread $_[0]; +} -warn "about to bootstrap Thread\n"; bootstrap Thread; my $cv; -foreach $cv (\&yield, \&sync, \&join, \&fast, - \&waituntil, \&signal, \&broadcast) { - warn "Thread.pm: calling fast($cv)\n"; +foreach $cv (\&yield, \&sync, \&join, \&fast, \&DESTROY, + \&cond_wait, \&cond_signal, \&cond_broadcast) { fast($cv); } sync(\&new); # not sure if this needs to be sync'd -sync(\&Thread::Cond::new); # this needs syncing because of condpair_table 1; diff --git a/Thread.xs b/Thread.xs index dcb2d36..e131745 100644 --- a/Thread.xs +++ b/Thread.xs @@ -2,15 +2,6 @@ #include "perl.h" #include "XSUB.h" -typedef struct condpair { - pthread_mutex_t mutex; - pthread_cond_t cond; - Thread owner; -} condpair_t; - -AV *condpair_table; -typedef SSize_t Thread__Cond; - static void * threadstart(arg) void *arg; @@ -72,12 +63,13 @@ void *arg; op = pp_entersub(ARGS); if (op) runops(); - retval = stack_sp - (stack_base + oldmark); - sp -= retval; + SPAGAIN; + retval = sp - (stack_base + oldmark); + sp = stack_base + oldmark + 1; av_store(returnav, 0, newSVpv("", 0)); - for (i = 1; i <= retval; i++) - sv_setsv(*av_fetch(returnav, i, TRUE), *sp++); - + for (i = 1; i <= retval; i++, sp++) + sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp)); + finishoff: SvREFCNT_dec(stack); SvREFCNT_dec(cvcache); @@ -88,7 +80,13 @@ void *arg; Safefree(cxstack); Safefree(tmps_stack); + if (ThrSTATE(thr) == THR_DETACHED) { + SvREFCNT_dec(returnav); + ThrSETSTATE(thr, THR_DEAD); + } return (void *) returnav; /* Available for anyone to join with us */ + /* unless we are detached in which case */ + /* noone will see the value anyway. */ } Thread @@ -111,11 +109,12 @@ AV *initargs; /* top_env? */ /* runlevel */ cvcache = newHV(); + ThrSETSTATE(thr, THR_NORMAL); /* The following pushes the arg list and startsv onto the *new* stack */ PUSHMARK(sp); /* Could easily speed up the following greatly */ - for (i = 0; i < AvFILL(initargs); i++) + for (i = 0; i <= AvFILL(initargs); i++) XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE))); XPUSHs(SvREFCNT_inc(startsv)); PUTBACK; @@ -138,44 +137,6 @@ AV *initargs; return thr; } -void condpair_kick(SSize_t cond, SV *code, int broadcast_flag) { - condpair_t *condp; - HV *hvp; - GV *gvp; - CV *cv = sv_2cv(code, &hvp, &gvp, FALSE); - SV *sv = *av_fetch(condpair_table, cond, TRUE); - dTHR; - - if (!SvOK(sv)) - croak("bad Cond object argument"); - condp = (condpair_t *) SvPVX(sv); - /* Get ownership of condpair object */ - MUTEX_LOCK(&condp->mutex); - while (condp->owner && condp->owner != thr) - COND_WAIT(&condp->cond, &condp->mutex); - if (condp->owner == thr) { - MUTEX_UNLOCK(&condp->mutex); - croak("Recursing in Thread::Cond::waituntil"); - } - condp->owner = thr; - MUTEX_UNLOCK(&condp->mutex); - /* We now own the condpair object */ - perl_call_sv(code, G_SCALAR|G_NOARGS|G_DISCARD|G_EVAL); - /* Release condpair object */ - MUTEX_LOCK(&condp->mutex); - condp->owner = 0; - /* Signal or Broadcast condpair */ - if (broadcast_flag) - COND_BROADCAST(&condp->cond); - else - COND_SIGNAL(&condp->cond); - MUTEX_UNLOCK(&condp->mutex); - /* Check we don't need to propagate a die */ - sv = GvSV(gv_fetchpv("@", TRUE, SVt_PV)); - if (SvTRUE(sv)) - croak(SvPV(sv, na)); -} - static SV * fast(sv) SV *sv; @@ -200,7 +161,7 @@ Thread new(class, startsv, ...) SV * class SV * startsv - AV * av = av_make(items - 1, &ST(2)); + AV * av = av_make(items - 2, &ST(2)); CODE: RETVAL = newthread(startsv, av); OUTPUT: @@ -212,7 +173,7 @@ sync(sv) HV * hvp = NO_INIT GV * gvp = NO_INIT CODE: - SvFLAGS(sv_2cv(sv, &hvp, &gvp, FALSE)) |= SVpcv_SYNC; + SvFLAGS(sv_2cv(sv, &hvp, &gvp, FALSE)) |= SVp_SYNC; ST(0) = sv_mortalcopy(sv); void @@ -227,93 +188,122 @@ join(t) AV * av = NO_INIT int i = NO_INIT PPCODE: + if (ThrSTATE(t) == THR_DETACHED) + croak("tried to join a detached thread"); + else if (ThrSTATE(t) == THR_JOINED) + croak("tried to rejoin an already joined thread"); + else if (ThrSTATE(t) == THR_DEAD) + croak("tried to join a dead thread"); + if (pthread_join(t->Tself, (void **) &av)) croak("pthread_join failed"); + ThrSETSTATE(t, THR_JOINED); /* Could easily speed up the following if necessary */ for (i = 0; i <= AvFILL(av); i++) XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE))); void -yield(t) +detach(t) Thread t CODE: - pthread_yield(); - -MODULE = Thread PACKAGE = Thread::Cond - -Thread::Cond -new(class) - char * class - SV * sv = NO_INIT - condpair_t * condp = NO_INIT - CODE: - if (!condpair_table) - condpair_table = newAV(); - sv = newSVpv("", 0); - sv_grow(sv, sizeof(condpair_t)); - condp = (condpair_t *) SvPVX(sv); - MUTEX_INIT(&condp->mutex); - COND_INIT(&condp->cond); - condp->owner = 0; - av_push(condpair_table, sv); - RETVAL = AvFILL(condpair_table); - OUTPUT: - RETVAL + if (ThrSTATE(t) == THR_DETACHED) + croak("tried to detach an already detached thread"); + else if (ThrSTATE(t) == THR_JOINED) + croak("tried to detach an already joined thread"); + else if (ThrSTATE(t) == THR_DEAD) + croak("tried to detach a dead thread"); + if (pthread_detach(t->Tself)) + croak("pthread_detach failed"); + ThrSETSTATE(t, THR_DETACHED); void -waituntil(cond, code) - Thread::Cond cond - SV * code - SV * sv = NO_INIT - condpair_t * condp = NO_INIT - HV * hvp = NO_INIT - GV * gvp = NO_INIT - CV * cv = sv_2cv(code, &hvp, &gvp, FALSE); - I32 count = NO_INIT +DESTROY(t) + Thread t CODE: - sv = *av_fetch(condpair_table, cond, TRUE); - if (!SvOK(sv)) - croak("bad Cond object argument"); - condp = (condpair_t *) SvPVX(sv); - do { - /* Get ownership of condpair object */ - MUTEX_LOCK(&condp->mutex); - while (condp->owner && condp->owner != thr) - COND_WAIT(&condp->cond, &condp->mutex); - if (condp->owner == thr) { - MUTEX_UNLOCK(&condp->mutex); - croak("Recursing in Thread::Cond::waituntil"); - } - condp->owner = thr; - MUTEX_UNLOCK(&condp->mutex); - /* We now own the condpair object */ - count = perl_call_sv(code, G_SCALAR|G_NOARGS|G_EVAL); - SPAGAIN; - /* Release condpair object */ - MUTEX_LOCK(&condp->mutex); - condp->owner = 0; - MUTEX_UNLOCK(&condp->mutex); - /* See if we need to go round again */ - if (count == 0) - croak(SvPV(GvSV(gv_fetchpv("@", TRUE, SVt_PV)), na)); - else if (count > 1) - croak("waituntil code returned more than one value"); - sv = POPs; - PUTBACK; - } while (!SvTRUE(sv)); - ST(0) = sv_mortalcopy(sv); + if (ThrSTATE(t) == THR_NORMAL) { + if (pthread_detach(t->Tself)) + croak("pthread_detach failed"); + ThrSETSTATE(t, THR_DETACHED); + } void -signal(cond, code) - Thread::Cond cond - SV * code +yield() CODE: - condpair_kick(cond, code, 0); +#ifdef OLD_PTHREADS_API + pthread_yield(); +#else +#ifndef NO_SCHED_YIELD + sched_yield(); +#endif /* NO_SCHED_YIELD */ +#endif /* OLD_PTHREADS_API */ void -broadcast(cond, code) - Thread::Cond cond - SV * code - CODE: - condpair_kick(cond, code, 1); +cond_wait(sv) + SV * sv + MAGIC * mg = NO_INIT +CODE: + if (SvROK(sv)) { + /* + * Kludge to allow lock of real objects without requiring + * to pass in every type of argument by explicit reference. + */ + sv = SvRV(sv); + } + mg = condpair_magic(sv); + DEBUG_L(fprintf(stderr, "0x%lx: cond_wait 0x%lx\n", + (unsigned long)thr, (unsigned long)sv)); + MUTEX_LOCK(MgMUTEXP(mg)); + if (MgOWNER(mg) != thr) { + MUTEX_UNLOCK(MgMUTEXP(mg)); + croak("cond_wait for lock that we don't own\n"); + } + MgOWNER(mg) = 0; + COND_WAIT(MgCONDP(mg), MgMUTEXP(mg)); + MgOWNER(mg) = thr; + MUTEX_UNLOCK(MgMUTEXP(mg)); + +void +cond_signal(sv) + SV * sv + MAGIC * mg = NO_INIT +CODE: + if (SvROK(sv)) { + /* + * Kludge to allow lock of real objects without requiring + * to pass in every type of argument by explicit reference. + */ + sv = SvRV(sv); + } + mg = condpair_magic(sv); + DEBUG_L(fprintf(stderr, "0x%lx: cond_signal 0x%lx\n", + (unsigned long)thr, (unsigned long)sv)); + MUTEX_LOCK(MgMUTEXP(mg)); + if (MgOWNER(mg) != thr) { + MUTEX_UNLOCK(MgMUTEXP(mg)); + croak("cond_signal for lock that we don't own\n"); + } + COND_SIGNAL(MgCONDP(mg)); + MUTEX_UNLOCK(MgMUTEXP(mg)); +void +cond_broadcast(sv) + SV * sv + MAGIC * mg = NO_INIT +CODE: + if (SvROK(sv)) { + /* + * Kludge to allow lock of real objects without requiring + * to pass in every type of argument by explicit reference. + */ + sv = SvRV(sv); + } + mg = condpair_magic(sv); + DEBUG_L(fprintf(stderr, "0x%lx: cond_broadcast 0x%lx\n", + (unsigned long)thr, (unsigned long)sv)); + MUTEX_LOCK(MgMUTEXP(mg)); + if (MgOWNER(mg) != thr) { + MUTEX_UNLOCK(MgMUTEXP(mg)); + croak("cond_broadcast for lock that we don't own\n"); + } + COND_BROADCAST(MgCONDP(mg)); + MUTEX_UNLOCK(MgMUTEXP(mg)); diff --git a/lock.t b/lock.t new file mode 100644 index 0000000..d598718 --- /dev/null +++ b/lock.t @@ -0,0 +1,27 @@ +use Thread; + +$level = 0; + +sub worker +{ + my $num = shift; + my $i; + print "thread $num starting\n"; + for ($i = 1; $i <= 20; $i++) { + print "thread $num iteration $i\n"; + select(undef, undef, undef, rand(10)/100); + { + reset($lock); + warn "thread $num saw non-zero level = $level\n" if $level; + $level++; + print "thread $num has lock\n"; + select(undef, undef, undef, rand(10)/100); + $level--; + } + print "thread $num released lock\n"; + } +} + +for ($t = 1; $t <= 5; $t++) { + new Thread \&worker, $t; +} diff --git a/typemap b/typemap index 8d39ff1..4918b11 100644 --- a/typemap +++ b/typemap @@ -1,5 +1,4 @@ Thread T_IVOBJ -Thread::Cond T_IVOBJ INPUT T_IVOBJ @@ -7,6 +6,11 @@ T_IVOBJ $var = ($type) SvIV((SV*)SvRV($arg)); else croak(\"$var is not an object\") +T_IVREF + if (SvROK($arg)) + $var = ($type) SvIV((SV*)SvRV($arg)); + else + croak(\"$var is not a reference\") OUTPUT T_IVOBJ -- 2.7.4