From ac9d3a9d44cf24ffa50574d1a7d6c147fd438c5f Mon Sep 17 00:00:00 2001 From: "Jerry D. Hedden" Date: Thu, 8 May 2008 06:05:51 -0400 Subject: [PATCH] Thread::Queue 2.07 From: "Jerry D. Hedden" Message-ID: <1ff86f510805080705p3cc8f657i7a1441da5b0a273b@mail.gmail.com> p4raw-id: //depot/perl@33808 --- lib/Thread/Queue.pm | 64 +++++++++++++++++++++++++++++++------------- lib/Thread/Queue/t/02_refs.t | 46 +++++++++++++++++++++++++++++-- 2 files changed, 89 insertions(+), 21 deletions(-) diff --git a/lib/Thread/Queue.pm b/lib/Thread/Queue.pm index 0d9eb10..dc2b1ed 100644 --- a/lib/Thread/Queue.pm +++ b/lib/Thread/Queue.pm @@ -3,10 +3,10 @@ package Thread::Queue; use strict; use warnings; -our $VERSION = '2.06'; +our $VERSION = '2.07'; use threads::shared 0.96; -use Scalar::Util 1.10 qw(looks_like_number); +use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); # Predeclarations for internal functions my ($make_shared, $validate_count, $validate_index); @@ -15,7 +15,7 @@ my ($make_shared, $validate_count, $validate_index); sub new { my $class = shift; - my @queue :shared = map { $make_shared->($_) } @_; + my @queue :shared = map { $make_shared->($_, {}) } @_; return bless(\@queue, $class); } @@ -24,7 +24,7 @@ sub enqueue { my $queue = shift; lock(@$queue); - push(@$queue, map { $make_shared->($_) } @_) + push(@$queue, map { $make_shared->($_, {}) } @_) and cond_signal(@$queue); } @@ -111,7 +111,7 @@ sub insert } # Add new items to the queue - push(@$queue, map { $make_shared->($_) } @_); + push(@$queue, map { $make_shared->($_, {}) } @_); # Add previous items back onto the queue push(@$queue, @tmp); @@ -163,7 +163,7 @@ sub extract # Create a thread-shared version of a complex data structure or object $make_shared = sub { - my $item = shift; + my ($item, $cloned) = @_; # If not running 'threads' or already thread-shared, # then just return the input item @@ -172,22 +172,34 @@ $make_shared = sub { # Make copies of array, hash and scalar refs my $copy; - if (my $ref_type = Scalar::Util::reftype($item)) { + if (my $ref_type = reftype($item)) { + # Check for previously cloned references + # (this takes care of circular refs as well) + my $addr = refaddr($item); + if (defined($addr) && exists($cloned->{$addr})) { + # Return the already existing clone + return $cloned->{$addr}; + } + # Copy an array ref if ($ref_type eq 'ARRAY') { # Make empty shared array ref $copy = &share([]); + # Add to clone checking hash + $cloned->{$addr} = $copy; # Recursively copy and add contents - push(@$copy, map { $make_shared->($_) } @$item); + push(@$copy, map { $make_shared->($_, $cloned) } @$item); } # Copy a hash ref elsif ($ref_type eq 'HASH') { # Make empty shared hash ref $copy = &share({}); + # Add to clone checking hash + $cloned->{$addr} = $copy; # Recursively copy and add contents foreach my $key (keys(%{$item})) { - $copy->{$key} = $make_shared->($item->{$key}); + $copy->{$key} = $make_shared->($item->{$key}, $cloned); } } @@ -199,13 +211,27 @@ $make_shared = sub { if (Internals::SvREADONLY($$item)) { Internals::SvREADONLY($$copy, 1); } + # Add to clone checking hash + $cloned->{$addr} = $copy; } # Copy of a ref of a ref elsif ($ref_type eq 'REF') { - my $tmp = $make_shared->($$item); - $copy = \$tmp; - share($copy); + # Special handling for $x = \$x + my $addr2 = refaddr($$item); + if ($addr2 == $addr) { + $copy = \$copy; + share($copy); + $cloned->{$addr} = $copy; + } else { + my $tmp; + $copy = \$tmp; + share($copy); + # Add to clone checking hash + $cloned->{$addr} = $copy; + # Recursively copy and add contents + $tmp = $make_shared->($$item, $cloned); + } } } @@ -214,16 +240,16 @@ $make_shared = sub { # other than an ordinary scalar return $item if (! defined($copy)); + # If input item is an object, then bless the copy into the same class + if (my $class = blessed($item)) { + bless($copy, $class); + } + # Clone READONLY flag if (Internals::SvREADONLY($item)) { Internals::SvREADONLY($copy, 1); } - # If input item is an object, then bless the copy into the same class - if (my $class = Scalar::Util::blessed($item)) { - bless($copy, $class); - } - return $copy; }; @@ -265,7 +291,7 @@ Thread::Queue - Thread-safe queues =head1 VERSION -This document describes Thread::Queue version 2.06 +This document describes Thread::Queue version 2.07 =head1 SYNOPSIS @@ -518,7 +544,7 @@ Thread::Queue Discussion Forum on CPAN: L Annotated POD for Thread::Queue: -L +L Source repository: L diff --git a/lib/Thread/Queue/t/02_refs.t b/lib/Thread/Queue/t/02_refs.t index 388cc6d..b09eca2 100644 --- a/lib/Thread/Queue/t/02_refs.t +++ b/lib/Thread/Queue/t/02_refs.t @@ -23,7 +23,7 @@ if ($] == 5.008) { require Test::More; } Test::More->import(); -plan('tests' => 39); +plan('tests' => 45); # Regular array my @ary1 = qw/foo bar baz/; @@ -62,6 +62,20 @@ my $baz = \$bar; my $qux = \$baz; is_deeply($$$$qux, $foo, 'Ref of ref'); +# Circular refs +my $cir1; +$cir1 = \$cir1; + +my $cir1s : shared; +$cir1s = \$cir1s; + +my $cir2; +$cir2 = [ \$cir2, { 'ref' => \$cir2 } ]; + +my $cir3 :shared = &share({}); +$cir3->{'self'} = \$cir3; +bless($cir3, 'Circular'); + # Queue up items my $q = Thread::Queue->new(\@ary1, \@ary2); ok($q, 'New queue'); @@ -70,10 +84,12 @@ $q->enqueue($obj1, $obj2); is($q->pending(), 4, 'Queue count'); $q->enqueue($sref1, $sref2, $qux); is($q->pending(), 7, 'Queue count'); +$q->enqueue($cir1, $cir1s, $cir2, $cir3); +is($q->pending(), 11, 'Queue count'); # Process items in thread threads->create(sub { - is($q->pending(), 7, 'Queue count in thread'); + is($q->pending(), 11, 'Queue count in thread'); my $tary1 = $q->dequeue(); ok($tary1, 'Thread got item'); @@ -119,6 +135,32 @@ threads->create(sub { my $qux = $q->dequeue(); is_deeply($$$$qux, $foo, 'Ref of ref'); + my ($c1, $c1s, $c2, $c3) = $q->dequeue(4); + SKIP: { + skip("Needs threads::shared >= 1.19", 5) + if ($threads::shared::VERSION < 1.19); + + is(threads::shared::_id($$c1), + threads::shared::_id($c1), + 'Circular ref - scalar'); + + is(threads::shared::_id($$c1s), + threads::shared::_id($c1s), + 'Circular ref - shared scalar'); + + is(threads::shared::_id(${$c2->[0]}), + threads::shared::_id($c2), + 'Circular ref - array'); + + is(threads::shared::_id(${$c2->[1]->{'ref'}}), + threads::shared::_id($c2), + 'Circular ref - mixed'); + + is(threads::shared::_id(${$c3->{'self'}}), + threads::shared::_id($c3), + 'Circular ref - hash'); + } + is($q->pending(), 0, 'Empty queue'); my $nothing = $q->dequeue_nb(); ok(! defined($nothing), 'Nothing on queue'); -- 2.7.4