From 8718f9a1b30f0e2dc3598c478b0edf7f5b51c660 Mon Sep 17 00:00:00 2001 From: "Jerry D. Hedden" Date: Wed, 20 Dec 2006 02:30:21 -0800 Subject: [PATCH] threads 1.57 From: "Jerry D. Hedden" Message-ID: <20061220183021.79793.qmail@web30205.mail.mud.yahoo.com> p4raw-id: //depot/perl@29608 --- ext/threads/Changes | 7 +++ ext/threads/README | 2 +- ext/threads/t/exit.t | 16 ++++--- ext/threads/t/join.t | 49 +++++++++++++++++++- ext/threads/t/state.t | 90 ++++++++++++++++++++++++++++++++++--- ext/threads/t/thread.t | 2 +- ext/threads/threads.pm | 8 ++-- ext/threads/threads.xs | 118 ++++++++++++++++++++++++++++++------------------- 8 files changed, 225 insertions(+), 67 deletions(-) diff --git a/ext/threads/Changes b/ext/threads/Changes index 643155f..637926f 100755 --- a/ext/threads/Changes +++ b/ext/threads/Changes @@ -1,5 +1,12 @@ Revision history for Perl extension threads. +1.57 Wed Dec 20 13:10:26 EST 2006 + - Fixes courtesy of Michael J. Pomraning + Eliminates self joins. + Eliminates multiple, simultaneous joins on a thread. + Protects thread->state variable with mutexes. + Checks that OS join call is successful. + 1.56 Fri Dec 15 12:18:47 EST 2006 - More fixes to test suite diff --git a/ext/threads/README b/ext/threads/README index ae21582..210d012 100755 --- a/ext/threads/README +++ b/ext/threads/README @@ -1,4 +1,4 @@ -threads version 1.56 +threads version 1.57 ==================== This module exposes interpreter threads to the Perl level. diff --git a/ext/threads/t/exit.t b/ext/threads/t/exit.t index a23d394..49549a9 100644 --- a/ext/threads/t/exit.t +++ b/ext/threads/t/exit.t @@ -56,7 +56,7 @@ my $rc = $thr->join(); ok(! defined($rc), 'Exited: threads->exit()'); -run_perl(prog => 'use threads 1.56;' . +run_perl(prog => 'use threads 1.57;' . 'threads->exit(86);' . 'exit(99);', nolib => ($ENV{PERL_CORE}) ? 0 : 1, @@ -104,7 +104,7 @@ $rc = $thr->join(); ok(! defined($rc), 'Exited: $thr->set_thread_exit_only'); -run_perl(prog => 'use threads 1.56 qw(exit thread_only);' . +run_perl(prog => 'use threads 1.57 qw(exit thread_only);' . 'threads->create(sub { exit(99); })->join();' . 'exit(86);', nolib => ($ENV{PERL_CORE}) ? 0 : 1, @@ -112,10 +112,11 @@ run_perl(prog => 'use threads 1.56 qw(exit thread_only);' . is($?>>8, 86, "'use threads 'exit' => 'thread_only'"); -my $out = run_perl(prog => 'use threads 1.56;' . +my $out = run_perl(prog => 'use threads 1.57;' . 'threads->create(sub {' . ' exit(99);' . - '})->join();' . + '});' . + 'sleep(1);' . 'exit(86);', nolib => ($ENV{PERL_CORE}) ? 0 : 1, switches => ($ENV{PERL_CORE}) ? [] : [ '-Mblib' ], @@ -124,11 +125,12 @@ is($?>>8, 99, "exit(status) in thread"); like($out, '1 finished and unjoined', "exit(status) in thread"); -$out = run_perl(prog => 'use threads 1.56 qw(exit thread_only);' . +$out = run_perl(prog => 'use threads 1.57 qw(exit thread_only);' . 'threads->create(sub {' . ' threads->set_thread_exit_only(0);' . ' exit(99);' . - '})->join();' . + '});' . + 'sleep(1);' . 'exit(86);', nolib => ($ENV{PERL_CORE}) ? 0 : 1, switches => ($ENV{PERL_CORE}) ? [] : [ '-Mblib' ], @@ -137,7 +139,7 @@ is($?>>8, 99, "set_thread_exit_only(0)"); like($out, '1 finished and unjoined', "set_thread_exit_only(0)"); -run_perl(prog => 'use threads 1.56;' . +run_perl(prog => 'use threads 1.57;' . 'threads->create(sub {' . ' $SIG{__WARN__} = sub { exit(99); };' . ' die();' . diff --git a/ext/threads/t/join.t b/ext/threads/t/join.t index bebfd6d..0dd9514 100644 --- a/ext/threads/t/join.t +++ b/ext/threads/t/join.t @@ -28,7 +28,7 @@ BEGIN { } $| = 1; - print("1..17\n"); ### Number of tests that will be run ### + print("1..20\n"); ### Number of tests that will be run ### }; my $TEST; @@ -181,4 +181,51 @@ if ($^O eq 'linux') { $_->join for map threads->create(sub{ok($_, "stress newCONSTSUB")}), 1..2; } +{ + my $go : shared = 0; + + my $t = threads->create( sub { + lock($go); + cond_wait($go) until $go; + }); + + my $joiner = threads->create(sub { $_[0]->join }, $t); + + threads->yield(); + sleep 1; + eval { $t->join; }; + ok(($@ =~ /^Thread already joined at/)?1:0, "Join pending join"); + + { lock($go); $go = 1; cond_signal($go); } + $joiner->join; +} + +{ + my $go : shared = 0; + my $t = threads->create( sub { + eval { threads->self->join; }; + ok(($@ =~ /^Cannot join self/), "Join self"); + lock($go); $go = 1; cond_signal($go); + }); + + { lock ($go); cond_wait($go) until $go; } + $t->join; +} + +{ + my $go : shared = 0; + my $t = threads->create( sub { + lock($go); cond_wait($go) until $go; + }); + my $joiner = threads->create(sub { $_[0]->join; }, $t); + + threads->yield(); + sleep 1; + eval { $t->detach }; + ok(($@ =~ /^Cannot detach a joined thread at/)?1:0, "Detach pending join"); + + { lock($go); $go = 1; cond_signal($go); } + $joiner->join; +} + # EOF diff --git a/ext/threads/t/state.t b/ext/threads/t/state.t index 80724db..b786840 100644 --- a/ext/threads/t/state.t +++ b/ext/threads/t/state.t @@ -28,7 +28,7 @@ BEGIN { } $| = 1; - print("1..53\n"); ### Number of tests that will be run ### + print("1..59\n"); ### Number of tests that will be run ### }; my $TEST; @@ -180,11 +180,87 @@ ok($thr->is_detached(), 'thread detached'); ok(! $thr->is_joinable(), 'thread not joinable'); ok(threads->list(threads::joinable) == 0, 'thread joinable list'); -threads->create(sub { - ok(! threads->is_detached(), 'thread not detached'); - ok(threads->list(threads::running) == 1, 'thread running list'); - ok(threads->list(threads::joinable) == 0, 'thread joinable list'); - ok(threads->list(threads::all) == 1, 'thread list'); -})->join(); +{ + my $go : shared = 0; + my $t = threads->create( sub { + ok(! threads->is_detached(), 'thread not detached'); + ok(threads->list(threads::running) == 1, 'thread running list'); + ok(threads->list(threads::joinable) == 0, 'thread joinable list'); + ok(threads->list(threads::all) == 1, 'thread list'); + lock($go); $go = 1; cond_signal($go); + }); + + { lock ($go); cond_wait($go) until $go; } + $t->join; +} + +{ + my $rdy :shared = 0; + sub thr_ready + { + lock($rdy); + $rdy++; + cond_signal($rdy); + } + + my $go :shared = 0; + sub thr_wait + { + lock($go); + cond_wait($go) until $go; + } + + my $done :shared = 0; + sub thr_done + { + lock($done); + $done++; + cond_signal($done); + } + + my $thr_routine = sub { thr_ready(); thr_wait(); thr_done(); }; + + # Create 8 threads: + # 3 running, blocking on $go + # 2 running, blocking on $go, join pending + # 2 running, blocking on join of above + # 1 finished, unjoined + + for (1..3) { threads->create($thr_routine); } + + foreach my $t (map {threads->create($thr_routine)} 1..2) { + threads->create(sub { thr_ready(); $_[0]->join; thr_done(); }, $t); + } + threads->create(sub { thr_ready(); thr_done(); }); + { + lock($done); + cond_wait($done) until ($done == 1); + } + { + lock($rdy); + cond_wait($rdy) until ($rdy == 8); + } + threads->yield(); + sleep(1); + + ok(threads->list(threads::running) == 5, 'thread running list'); + ok(threads->list(threads::joinable) == 1, 'thread joinable list'); + ok(threads->list(threads::all) == 6, 'thread all list'); + + { lock($go); $go = 1; cond_broadcast($go); } + { + lock($done); + cond_wait($done) until ($done == 8); + } + threads->yield(); + sleep(1); + + ok(threads->list(threads::running) == 0, 'thread running list'); + # Two awaiting join() have completed + ok(threads->list(threads::joinable) == 6, 'thread joinable list'); + ok(threads->list(threads::all) == 6, 'thread all list'); + + for (threads->list) { $_->join; } +} # EOF diff --git a/ext/threads/t/thread.t b/ext/threads/t/thread.t index 3d3989e..7e71900 100644 --- a/ext/threads/t/thread.t +++ b/ext/threads/t/thread.t @@ -171,7 +171,7 @@ package main; # bugid #24165 -run_perl(prog => 'use threads 1.56;' . +run_perl(prog => 'use threads 1.57;' . 'sub a{threads->create(shift)} $t = a sub{};' . '$t->tid; $t->join; $t->tid', nolib => ($ENV{PERL_CORE}) ? 0 : 1, diff --git a/ext/threads/threads.pm b/ext/threads/threads.pm index eff472f..69214b3 100755 --- a/ext/threads/threads.pm +++ b/ext/threads/threads.pm @@ -5,7 +5,7 @@ use 5.008; use strict; use warnings; -our $VERSION = '1.56'; +our $VERSION = '1.57'; my $XS_VERSION = $VERSION; $VERSION = eval $VERSION; @@ -133,7 +133,7 @@ threads - Perl interpreter-based threads =head1 VERSION -This document describes threads version 1.56 +This document describes threads version 1.57 =head1 SYNOPSIS @@ -373,7 +373,7 @@ list of all non-joined, non-detached I objects. In a scalar context, returns a count of the same. With a I argument (using C), returns a list of all -non-detached I objects that are still running. +non-joined, non-detached I objects that are still running. With a I argument (using C), returns a list of all non-joined, non-detached I objects that have finished running (i.e., @@ -949,7 +949,7 @@ L Discussion Forum on CPAN: L Annotated POD for L: -L +L L, L diff --git a/ext/threads/threads.xs b/ext/threads/threads.xs index cc4e7c9..f15e40e 100755 --- a/ext/threads/threads.xs +++ b/ext/threads/threads.xs @@ -47,6 +47,7 @@ typedef perl_os_thread pthread_t; /* Values for 'state' member */ #define PERL_ITHR_DETACHED 1 #define PERL_ITHR_JOINED 2 +#define PERL_ITHR_UNCALLABLE (PERL_ITHR_DETACHED|PERL_ITHR_JOINED) #define PERL_ITHR_FINISHED 4 #define PERL_ITHR_THREAD_EXIT_ONLY 8 #define PERL_ITHR_NONVIABLE 16 @@ -138,7 +139,7 @@ S_ithread_clear(pTHX_ ithread *thread) PerlInterpreter *interp; assert(((thread->state & PERL_ITHR_FINISHED) && - (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) + (thread->state & PERL_ITHR_UNCALLABLE)) || (thread->state & PERL_ITHR_NONVIABLE)); @@ -187,7 +188,7 @@ S_ithread_destruct(pTHX_ ithread *thread) destroy = 1; } else if (! (thread->state & PERL_ITHR_FINISHED)) { destroy = 0; - } else if (! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) { + } else if (! (thread->state & PERL_ITHR_UNCALLABLE)) { destroy = 0; } else { thread->state |= PERL_ITHR_DESTROYED; @@ -847,8 +848,10 @@ ithread_create(...) /* $thr->create() */ classname = HvNAME(SvSTASH(SvRV(ST(0)))); thread = INT2PTR(ithread *, SvIV(SvRV(ST(0)))); + MUTEX_LOCK(&thread->mutex); stack_size = thread->stack_size; exit_opt = thread->state & PERL_ITHR_THREAD_EXIT_ONLY; + MUTEX_UNLOCK(&thread->mutex); } else { /* threads->create() */ classname = (char *)SvPV_nolen(ST(0)); @@ -952,6 +955,7 @@ ithread_list(...) int list_context; IV count = 0; int want_running = 0; + int state; dMY_POOL; PPCODE: /* Class method only */ @@ -974,19 +978,23 @@ ithread_list(...) thread != &MY_POOL.main_thread; thread = thread->next) { + MUTEX_LOCK(&thread->mutex); + state = thread->state; + MUTEX_UNLOCK(&thread->mutex); + /* Ignore detached or joined threads */ - if (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)) { + if (state & PERL_ITHR_UNCALLABLE) { continue; } /* Filter per parameter */ if (items > 1) { if (want_running) { - if (thread->state & PERL_ITHR_FINISHED) { + if (state & PERL_ITHR_FINISHED) { continue; /* Not running */ } } else { - if (! (thread->state & PERL_ITHR_FINISHED)) { + if (! (state & PERL_ITHR_FINISHED)) { continue; /* Still running - not joinable yet */ } } @@ -1038,6 +1046,7 @@ void ithread_join(...) PREINIT: ithread *thread; + ithread *current_thread; int join_err; AV *params; int len; @@ -1045,6 +1054,7 @@ ithread_join(...) #ifdef WIN32 DWORD waitcode; #else + int rc_join; void *retval; #endif dMY_POOL; @@ -1054,42 +1064,56 @@ ithread_join(...) Perl_croak(aTHX_ "Usage: $thr->join()"); } - /* Check if the thread is joinable */ + /* Check if the thread is joinable and not ourselves */ thread = S_SV_to_ithread(aTHX_ ST(0)); - join_err = (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)); - if (join_err) { - if (join_err & PERL_ITHR_DETACHED) { - Perl_croak(aTHX_ "Cannot join a detached thread"); - } else { - Perl_croak(aTHX_ "Thread already joined"); - } + current_thread = S_ithread_get(aTHX); + + MUTEX_LOCK(&thread->mutex); + if ((join_err = (thread->state & PERL_ITHR_UNCALLABLE))) { + MUTEX_UNLOCK(&thread->mutex); + Perl_croak(aTHX_ (join_err & PERL_ITHR_DETACHED) + ? "Cannot join a detached thread" + : "Thread already joined"); + } else if (thread->tid == current_thread->tid) { + MUTEX_UNLOCK(&thread->mutex); + Perl_croak(aTHX_ "Cannot join self"); } + /* Mark as joined */ + thread->state |= PERL_ITHR_JOINED; + MUTEX_UNLOCK(&thread->mutex); + + MUTEX_LOCK(&MY_POOL.create_destruct_mutex); + MY_POOL.joinable_threads--; + MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex); + /* Join the thread */ #ifdef WIN32 - waitcode = WaitForSingleObject(thread->handle, INFINITE); + if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) { + /* Timeout/abandonment unexpected here; check $^E */ + Perl_croak(aTHX_ "PANIC: underlying join failed"); + }; #else - pthread_join(thread->thr, &retval); + if ((rc_join = pthread_join(thread->thr, &retval)) != 0) { + /* In progress/deadlock/unknown unexpected here; check $! */ + errno = rc_join; + Perl_croak(aTHX_ "PANIC: underlying join failed"); + }; #endif MUTEX_LOCK(&thread->mutex); - /* Mark as joined */ - thread->state |= PERL_ITHR_JOINED; - /* Get the return value from the call_sv */ /* Objects do not survive this process - FIXME */ { AV *params_copy; PerlInterpreter *other_perl; CLONE_PARAMS clone_params; - ithread *current_thread; params_copy = (AV *)SvRV(thread->params); other_perl = thread->interp; clone_params.stashes = newAV(); clone_params.flags = CLONEf_JOIN_IN; PL_ptr_table = ptr_table_new(); - current_thread = S_ithread_get(aTHX); S_ithread_set(aTHX_ thread); /* Ensure 'meaningful' addresses retain their meaning */ ptr_table_store(PL_ptr_table, &other_perl->Isv_undef, &PL_sv_undef); @@ -1109,12 +1133,6 @@ ithread_join(...) } MUTEX_UNLOCK(&thread->mutex); - MUTEX_LOCK(&MY_POOL.create_destruct_mutex); - if (! (thread->state & PERL_ITHR_DETACHED)) { - MY_POOL.joinable_threads--; - } - MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex); - /* Try to cleanup thread */ S_ithread_destruct(aTHX_ thread); @@ -1150,34 +1168,34 @@ ithread_detach(...) CODE: PERL_UNUSED_VAR(items); - /* Check if the thread is detachable */ - thread = S_SV_to_ithread(aTHX_ ST(0)); - if ((detach_err = (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))) { - if (detach_err & PERL_ITHR_DETACHED) { - Perl_croak(aTHX_ "Thread already detached"); - } else { - Perl_croak(aTHX_ "Cannot detach a joined thread"); - } - } - /* Detach the thread */ + thread = S_SV_to_ithread(aTHX_ ST(0)); MUTEX_LOCK(&MY_POOL.create_destruct_mutex); MUTEX_LOCK(&thread->mutex); - thread->state |= PERL_ITHR_DETACHED; + if (! (detach_err = (thread->state & PERL_ITHR_UNCALLABLE))) { + /* Thread is detachable */ + thread->state |= PERL_ITHR_DETACHED; #ifdef WIN32 - /* Windows has no 'detach thread' function */ + /* Windows has no 'detach thread' function */ #else - PERL_THREAD_DETACH(thread->thr); + PERL_THREAD_DETACH(thread->thr); #endif - if (thread->state & PERL_ITHR_FINISHED) { - MY_POOL.joinable_threads--; - } else { - MY_POOL.running_threads--; - MY_POOL.detached_threads++; + if (thread->state & PERL_ITHR_FINISHED) { + MY_POOL.joinable_threads--; + } else { + MY_POOL.running_threads--; + MY_POOL.detached_threads++; + } } MUTEX_UNLOCK(&thread->mutex); MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex); + if (detach_err) { + Perl_croak(aTHX_ (detach_err & PERL_ITHR_DETACHED) + ? "Thread already detached" + : "Cannot detach a joined thread"); + } + /* If thread is finished and didn't die, * then we can free its interpreter */ MUTEX_LOCK(&thread->mutex); @@ -1272,6 +1290,7 @@ ithread_object(...) char *classname; UV tid; ithread *thread; + int state; int have_obj = 0; dMY_POOL; CODE: @@ -1297,7 +1316,10 @@ ithread_object(...) /* Look for TID */ if (thread->tid == tid) { /* Ignore if detached or joined */ - if (! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) { + MUTEX_LOCK(&thread->mutex); + state = thread->state; + MUTEX_UNLOCK(&thread->mutex); + if (! (state & PERL_ITHR_UNCALLABLE)) { /* Put object on stack */ ST(0) = sv_2mortal(S_ithread_to_SV(aTHX_ Nullsv, thread, classname, TRUE)); have_obj = 1; @@ -1377,7 +1399,9 @@ ithread_is_running(...) } thread = INT2PTR(ithread *, SvIV(SvRV(ST(0)))); + MUTEX_LOCK(&thread->mutex); ST(0) = (thread->state & PERL_ITHR_FINISHED) ? &PL_sv_no : &PL_sv_yes; + MUTEX_UNLOCK(&thread->mutex); /* XSRETURN(1); - implied */ @@ -1388,7 +1412,9 @@ ithread_is_detached(...) CODE: PERL_UNUSED_VAR(items); thread = S_SV_to_ithread(aTHX_ ST(0)); + MUTEX_LOCK(&thread->mutex); ST(0) = (thread->state & PERL_ITHR_DETACHED) ? &PL_sv_yes : &PL_sv_no; + MUTEX_UNLOCK(&thread->mutex); /* XSRETURN(1); - implied */ @@ -1405,7 +1431,7 @@ ithread_is_joinable(...) thread = INT2PTR(ithread *, SvIV(SvRV(ST(0)))); MUTEX_LOCK(&thread->mutex); ST(0) = ((thread->state & PERL_ITHR_FINISHED) && - ! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) + ! (thread->state & PERL_ITHR_UNCALLABLE)) ? &PL_sv_yes : &PL_sv_no; MUTEX_UNLOCK(&thread->mutex); /* XSRETURN(1); - implied */ -- 2.7.4