From 458fb5819c1ac395635ee1129f0f694cb0128ffd Mon Sep 17 00:00:00 2001 From: Malcolm Beattie Date: Thu, 13 Nov 1997 18:01:27 +0000 Subject: [PATCH] Rewrite thread return code to distinguish between ordinary return and die() and make join propagate the die. Add tiny method eval which just does "return eval { shift->join; }". Add Thread::Specific class for access to thread specific user data along with specific.t. Rename Class to classname throughout Thread.xs for consistency. Fix pp_specific to pp_threadsv in global.sym. Add support to pp_entersub in pp_hot.c to lock stash for static locked methods. p4raw-id: //depot/perl@248 --- MANIFEST | 4 ++ ext/Thread/Thread.pm | 4 ++ ext/Thread/Thread.xs | 99 ++++++++++++++++++++++++++++++------------- ext/Thread/Thread/Specific.pm | 14 ++++++ ext/Thread/join.t | 2 +- ext/Thread/specific.t | 17 ++++++++ global.sym | 2 +- lib/fields.pm | 18 ++++++++ pp_hot.c | 12 ++++-- thread.h | 4 +- 10 files changed, 140 insertions(+), 36 deletions(-) create mode 100644 ext/Thread/Thread/Specific.pm create mode 100644 ext/Thread/specific.t create mode 100644 lib/fields.pm diff --git a/MANIFEST b/MANIFEST index 53ffcab..09747e1 100644 --- a/MANIFEST +++ b/MANIFEST @@ -212,15 +212,19 @@ ext/Thread/Notes Thread notes ext/Thread/README Thread README ext/Thread/Thread/Queue.pm Thread synchronised queue objects ext/Thread/Thread/Semaphore.pm Thread semaphore objects +ext/Thread/Thread/Specific.pm Thread specific data access ext/Thread/Thread.pm Thread extension Perl module ext/Thread/Thread.xs Thread extension external subroutines ext/Thread/create.t Test thread creation +ext/Thread/die.t Test thread die() +ext/Thread/die2.t Test thread die() differently ext/Thread/io.t Test threads doing simple I/O ext/Thread/join.t Test thread joining ext/Thread/join2.t Test thread joining differently ext/Thread/list.t Test getting list of all threads ext/Thread/lock.t Test lock primitive ext/Thread/queue.t Test Thread::Queue module +ext/Thread/specific.t Test thread-specific user data ext/Thread/sync.t Test thread synchronisation ext/Thread/sync2.t Test thread synchronisation ext/Thread/typemap Thread extension interface types diff --git a/ext/Thread/Thread.pm b/ext/Thread/Thread.pm index 2ace5dd..1936142 100644 --- a/ext/Thread/Thread.pm +++ b/ext/Thread/Thread.pm @@ -15,6 +15,10 @@ sub async (&) { return new Thread $_[0]; } +sub eval { + return eval { shift->join; }; +} + bootstrap Thread; 1; diff --git a/ext/Thread/Thread.xs b/ext/Thread/Thread.xs index 841b569..ba256c9 100644 --- a/ext/Thread/Thread.xs +++ b/ext/Thread/Thread.xs @@ -19,7 +19,7 @@ static int sig_pipe[2]; typedef struct thread *Thread; #define THREAD_RET_TYPE void * #define THREAD_RET_CAST(x) ((THREAD_RET_TYPE) x) -#endif; +#endif static void remove_thread(struct thread *t) @@ -47,7 +47,7 @@ threadstart(void *arg) dSP; I32 oldscope = scopestack_ix; I32 retval; - AV *returnav; + AV *av; int i; DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n", @@ -86,7 +86,8 @@ threadstart(void *arg) I32 oldmark = TOPMARK; I32 oldscope = scopestack_ix; I32 retval; - AV *returnav; + SV *sv; + AV *av = newAV(); int i, ret; dJMPENV; DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p waiting to start\n", @@ -114,6 +115,7 @@ threadstart(void *arg) DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n", thr, SvPEEK(TOPs))); +#ifdef OLD_WAY JMPENV_PUSH(ret); switch (ret) { case 3: @@ -127,7 +129,10 @@ threadstart(void *arg) while (scopestack_ix > oldscope) LEAVE; JMPENV_POP; - av_store(returnav, 0, newSViv(statusvalue)); + MUTEX_LOCK(&thr->mutex); + thr->flags |= THRf_DID_DIE; + MUTEX_UNLOCK(&thr->mutex); + av = newSVpvf("Thread called exit with value %d", statusvalue); goto finishoff; } @@ -143,18 +148,34 @@ threadstart(void *arg) op = pp_entersub(ARGS); if (op) runops(); +#else + sv = POPs; + PUTBACK; + perl_call_sv(sv, G_ARRAY|G_EVAL); +#endif SPAGAIN; retval = sp - (stack_base + oldmark); sp = stack_base + oldmark + 1; - DEBUG_L(for (i = 1; i <= retval; i++) - PerlIO_printf(PerlIO_stderr(), - "%p returnav[%d] = %s\n", - thr, i, SvPEEK(sp[i - 1]));) - returnav = newAV(); - av_store(returnav, 0, newSVpv("", 0)); - for (i = 1; i <= retval; i++, sp++) - sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp)); - + if (SvCUR(thr->errsv)) { + MUTEX_LOCK(&thr->mutex); + thr->flags |= THRf_DID_DIE; + MUTEX_UNLOCK(&thr->mutex); + av_store(av, 0, &sv_no); + av_store(av, 1, newSVsv(thr->errsv)); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p died: %s\n", + SvPV(thr->errsv, na)); + } else { + DEBUG_L(STMT_START { + for (i = 1; i <= retval; i++) { + PerlIO_printf(PerlIO_stderr(), "%p return[%d] = %s\n", + thr, i, SvPEEK(sp[i - 1]));) + } + } STMT_END); + av_store(av, 0, &sv_yes); + for (i = 1; i <= retval; i++, sp++) + sv_setsv(*av_fetch(av, i, TRUE), SvREFCNT_inc(*sp)); + } + finishoff: #if 0 /* removed for debug */ @@ -194,7 +215,7 @@ threadstart(void *arg) case THRf_R_DETACHED: ThrSETSTATE(thr, THRf_DEAD); MUTEX_UNLOCK(&thr->mutex); - SvREFCNT_dec(returnav); + SvREFCNT_dec(av); DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: DETACHED thread finished\n", thr)); remove_thread(thr); /* This might trigger main thread to finish */ @@ -204,7 +225,7 @@ threadstart(void *arg) croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr)); /* NOTREACHED */ } - return THREAD_RET_CAST(returnav); /* Available for anyone to join with */ + return THREAD_RET_CAST(av); /* Available for anyone to join with */ /* us unless we're detached, in which */ /* case noone sees the value anyway. */ #endif @@ -214,7 +235,7 @@ threadstart(void *arg) } static SV * -newthread (SV *startsv, AV *initargs, char *Class) +newthread (SV *startsv, AV *initargs, char *classname) { #ifdef USE_THREADS dSP; @@ -274,7 +295,7 @@ newthread (SV *startsv, AV *initargs, char *Class) sv = newSViv(thr->tid); sv_magic(sv, thr->oursv, '~', 0, 0); SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; - return sv_bless(newRV_noinc(sv), gv_stashpv(Class, TRUE)); + return sv_bless(newRV_noinc(sv), gv_stashpv(classname, TRUE)); #else croak("No threads in this perl"); return &sv_undef; @@ -294,12 +315,12 @@ MODULE = Thread PACKAGE = Thread PROTOTYPES: DISABLE void -new(Class, startsv, ...) - char * Class +new(classname, startsv, ...) + char * classname SV * startsv AV * av = av_make(items - 2, &ST(2)); PPCODE: - XPUSHs(sv_2mortal(newthread(startsv, av, Class))); + XPUSHs(sv_2mortal(newthread(startsv, av, classname))); void join(t) @@ -329,9 +350,17 @@ join(t) } JOIN(t, &av); - /* Could easily speed up the following if necessary */ - for (i = 0; i <= AvFILL(av); i++) - XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE))); + if (SvTRUE(*av_fetch(av, 0, FALSE))) { + /* Could easily speed up the following if necessary */ + for (i = 1; i <= AvFILL(av); i++) + XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE))); + } else { + char *mess = SvPV(*av_fetch(av, 1, FALSE), na); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: join propagating die message: %s\n", + thr, mess)); + croak(mess); + } #endif void @@ -379,8 +408,8 @@ flags(t) #endif void -self(Class) - char * Class +self(classname) + char * classname PREINIT: SV *sv; PPCODE: @@ -388,7 +417,8 @@ self(Class) sv = newSViv(thr->tid); sv_magic(sv, thr->oursv, '~', 0, 0); SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; - PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(Class, TRUE)))); + PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), + gv_stashpv(classname, TRUE)))); #endif U32 @@ -486,8 +516,8 @@ CODE: #endif void -list(Class) - char * Class +list(classname) + char * classname PREINIT: Thread t; AV * av; @@ -510,7 +540,7 @@ list(Class) SV *sv = newSViv(0); /* fill in tid later */ sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */ av_push(av, sv_bless(newRV_noinc(sv), - gv_stashpv(Class, TRUE))); + gv_stashpv(classname, TRUE))); } } @@ -580,3 +610,14 @@ await_signal() OUTPUT: RETVAL +MODULE = Thread PACKAGE = Thread::Specific + +void +data(classname = "Thread::Specific") + char * classname + PPCODE: + if (AvFILL(thr->specific) == -1) { + GV *gv = gv_fetchpv("Thread::Specific::FIELDS", TRUE, SVt_PVHV); + av_store(thr->specific, 0, newRV((SV*)GvHV(gv))); + } + XPUSHs(sv_bless(newRV((SV*)thr->specific),gv_stashpv(classname,TRUE))); diff --git a/ext/Thread/Thread/Specific.pm b/ext/Thread/Thread/Specific.pm new file mode 100644 index 0000000..ec56539 --- /dev/null +++ b/ext/Thread/Thread/Specific.pm @@ -0,0 +1,14 @@ +package Thread::Specific; + +sub import { + use attrs qw(locked method); + require fields; + fields->import(@_); +} + +sub key_create { + use attrs qw(locked method); + return ++$FIELDS{__MAX__}; +} + +1; diff --git a/ext/Thread/join.t b/ext/Thread/join.t index 640256a..cba2c1c 100644 --- a/ext/Thread/join.t +++ b/ext/Thread/join.t @@ -8,4 +8,4 @@ print "Starting thread\n"; $t = new Thread \&foo, qw(foo bar baz); print "Joining with $t\n"; @results = $t->join(); -print "Joining returned @results\n"; +print "Joining returned ", scalar(@results), " values: @results\n"; diff --git a/ext/Thread/specific.t b/ext/Thread/specific.t new file mode 100644 index 0000000..da130b1 --- /dev/null +++ b/ext/Thread/specific.t @@ -0,0 +1,17 @@ +use Thread; + +use Thread::Specific qw(foo); + +sub count { + my $tid = Thread->self->tid; + my Thread::Specific $tsd = Thread::Specific::data; + for (my $i = 0; $i < 5; $i++) { + $tsd->{foo} = $i; + print "thread $tid count: $tsd->{foo}\n"; + select(undef, undef, undef, rand(2)); + } +}; + +for(my $t = 0; $t < 5; $t++) { + new Thread \&count; +} diff --git a/global.sym b/global.sym index c2c8b0b..2806ac6 100644 --- a/global.sym +++ b/global.sym @@ -958,7 +958,6 @@ pp_snetent pp_socket pp_sockpair pp_sort -pp_specific pp_splice pp_split pp_sprintf @@ -987,6 +986,7 @@ pp_system pp_syswrite pp_tell pp_telldir +pp_threadsv pp_tie pp_tied pp_time diff --git a/lib/fields.pm b/lib/fields.pm new file mode 100644 index 0000000..8e2d639 --- /dev/null +++ b/lib/fields.pm @@ -0,0 +1,18 @@ +package fields; + +sub import { + my $class = shift; + my ($package) = caller; + my $fields = \%{"$package\::FIELDS"}; + my $i = $fields->{__MAX__}; + foreach my $f (@_) { + if (defined($fields->{$f})) { + require Carp; + Carp::croak("Field name $f already in use"); + } + $fields->{$f} = ++$i; + } + $fields->{__MAX__} = $i; +} + +1; diff --git a/pp_hot.c b/pp_hot.c index b354540..6dbc259 100644 --- a/pp_hot.c +++ b/pp_hot.c @@ -1834,9 +1834,10 @@ PP(pp_entersub) #ifdef USE_THREADS /* * First we need to check if the sub or method requires locking. - * If so, we gain a lock on the CV or the first argument, as - * appropriate. This has to be inline because for FAKE_THREADS, - * COND_WAIT inlines code to reschedule by returning a new op. + * If so, we gain a lock on the CV, the first argument or the + * stash (for static methods), as appropriate. This has to be + * inline because for FAKE_THREADS, COND_WAIT inlines code to + * reschedule by returning a new op. */ MUTEX_LOCK(CvMUTEXP(cv)); if (CvFLAGS(cv) & CVf_LOCKED) { @@ -1850,6 +1851,11 @@ PP(pp_entersub) } if (SvROK(sv)) sv = SvRV(sv); + else { + STRLEN len; + char *stashname = SvPV(sv, len); + sv = (SV*)gv_stashpvn(stashname, len, TRUE); + } } else { sv = (SV*)cv; diff --git a/thread.h b/thread.h index d2455dc..cda4754 100644 --- a/thread.h +++ b/thread.h @@ -243,10 +243,10 @@ typedef struct thread *Thread; #define THRf_ZOMBIE 3 #define THRf_DEAD 4 -#define THRf_DIE_FATAL 8 +#define THRf_DID_DIE 8 /* ThrSTATE(t) and ThrSETSTATE(t) must only be called while holding t->mutex */ -#define ThrSTATE(t) ((t)->flags) +#define ThrSTATE(t) ((t)->flags & THRf_STATE_MASK) #define ThrSETSTATE(t, s) STMT_START { \ (t)->flags &= ~THRf_STATE_MASK; \ (t)->flags |= (s); \ -- 2.7.4