From c7bac10acd8724f859ddcb81408cfdbeee046f8f Mon Sep 17 00:00:00 2001 From: "Jerry D. Hedden" Date: Tue, 19 Feb 2013 12:23:46 -0800 Subject: [PATCH] [perl #116865] Upgrade to Thread::Queue 3.02 Attached patch updates Thread::Queue to v3.0.2 in preparation for a CPAN release. Adds a new method to Thread::Queue to dequeue items with a timeout feature. This addition was suggested by Andreas Huber. Signed-off-by: Chris 'BinGOs' Williams --- MANIFEST | 1 + Porting/Maintainers.pl | 2 +- dist/Thread-Queue/lib/Thread/Queue.pm | 75 +++++++++++++++++++++++++++++++++-- dist/Thread-Queue/t/10_timed.t | 66 ++++++++++++++++++++++++++++++ 4 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 dist/Thread-Queue/t/10_timed.t diff --git a/MANIFEST b/MANIFEST index a941a01..b1090bc 100644 --- a/MANIFEST +++ b/MANIFEST @@ -3565,6 +3565,7 @@ dist/Thread-Queue/t/06_insert.t Thread::Queue tests dist/Thread-Queue/t/07_lock.t Thread::Queue tests dist/Thread-Queue/t/08_nothreads.t Thread::Queue tests dist/Thread-Queue/t/09_ended.t Thread::Queue tests +dist/Thread-Queue/t/10_timed.t Thread::Queue tests dist/Thread-Semaphore/lib/Thread/Semaphore.pm Thread-safe semaphores dist/Thread-Semaphore/t/01_basic.t Thread::Semaphore tests dist/Thread-Semaphore/t/02_errs.t Thread::Semaphore tests diff --git a/Porting/Maintainers.pl b/Porting/Maintainers.pl index 0bcda65..5ede296 100755 --- a/Porting/Maintainers.pl +++ b/Porting/Maintainers.pl @@ -1872,7 +1872,7 @@ use File::Glob qw(:case); 'Thread::Queue' => { 'MAINTAINER' => 'jdhedden', - 'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.01.tar.gz', + 'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.02.tar.gz', 'FILES' => q[dist/Thread-Queue], 'EXCLUDED' => [ qr{^examples/}, diff --git a/dist/Thread-Queue/lib/Thread/Queue.pm b/dist/Thread-Queue/lib/Thread/Queue.pm index 0bf1624..027dd56 100644 --- a/dist/Thread-Queue/lib/Thread/Queue.pm +++ b/dist/Thread-Queue/lib/Thread/Queue.pm @@ -3,7 +3,7 @@ package Thread::Queue; use strict; use warnings; -our $VERSION = '3.01'; +our $VERSION = '3.02'; $VERSION = eval $VERSION; use threads::shared 1.21; @@ -13,7 +13,7 @@ use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); our @CARP_NOT = ("threads::shared"); # Predeclarations for internal functions -my ($validate_count, $validate_index); +my ($validate_count, $validate_index, $validate_timeout); # Create a new queue possibly pre-populated with items sub new @@ -103,6 +103,32 @@ sub dequeue_nb return @items; } +# Return items from the head of a queue, blocking if needed up to a timeout +sub dequeue_timed +{ + my $self = shift; + lock(%$self); + my $queue = $$self{'queue'}; + + # Timeout may be relative or absolute + my $timeout = @_ ? $validate_timeout->(shift) : -1; + # Convert to an absolute time for use with cond_timedwait() + if ($timeout < 32000000) { # More than one year + $timeout += time(); + } + + my $count = @_ ? $validate_count->(shift) : 1; + + # Wait for requisite number of items, or until timeout + while ((@$queue < $count) && ! $$self{'ENDED'}) { + last if (! cond_timedwait(%$self, $timeout)); + } + cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'}); + + # Get whatever we need off the queue if available + return $self->dequeue_nb($count); +} + # Return an item without removing it from a queue sub peek { @@ -232,6 +258,23 @@ $validate_count = sub { return $count; }; +# Check value of the requested timeout +$validate_timeout = sub { + my $timeout = shift; + + if (! defined($timeout) || + ! looks_like_number($timeout)) + { + require Carp; + my ($method) = (caller(1))[3]; + $method =~ s/Thread::Queue:://; + $timeout = 'undef' if (! defined($timeout)); + Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method"); + } + + return $timeout; +}; + 1; =head1 NAME @@ -240,7 +283,7 @@ Thread::Queue - Thread-safe queues =head1 VERSION -This document describes Thread::Queue version 3.01 +This document describes Thread::Queue version 3.02 =head1 SYNOPSIS @@ -280,6 +323,11 @@ This document describes Thread::Queue version 3.01 # Work on $item } + # Blocking dequeue with 5-second timeout + if (defined(my $item = $q->dequeue_timed(5))) { + # Work on $item + } + # Get the second item in the queue without dequeuing anything my $item = $q->peek(1); @@ -381,6 +429,27 @@ number of items, then it immediately (i.e., non-blocking) returns whatever items there are on the queue. If the queue is empty, then C is returned. +=item ->dequeue_timed(TIMEOUT) + +=item ->dequeue_timed(TIMEOUT, COUNT) + +Removes the requested number of items (default is 1) from the head of the +queue, and returns them. If the queue contains fewer than the requested +number of items, then the thread will be blocked until the requisite number of +items are available, or until the timeout is reached. If the timeout is +reached, it returns whatever items there are on the queue, or C if the +queue is empty. + +The timeout may be a number of seconds relative to the current time (e.g., 5 +seconds from when the call is made), or may be an absolute timeout in I +seconds the same as would be used with +L. +Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of +the underlying implementation). + +If C is missing, c, or less than or equal to 0, then this call +behaves the same as C. + =item ->pending() Returns the number of items still in the queue. Returns C if the queue diff --git a/dist/Thread-Queue/t/10_timed.t b/dist/Thread-Queue/t/10_timed.t new file mode 100644 index 0000000..8404720 --- /dev/null +++ b/dist/Thread-Queue/t/10_timed.t @@ -0,0 +1,66 @@ +use strict; +use warnings; + +BEGIN { + use Config; + if (! $Config{'useithreads'}) { + print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); + exit(0); + } +} + +use threads; +use Thread::Queue; + +if ($] == 5.008) { + require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 +} else { + require Test::More; +} +Test::More->import(); +plan('tests' => 19); + +### ->dequeue_timed(TIMEOUT, COUNT) test ### + +my $q = Thread::Queue->new(); +ok($q, 'New queue'); + +my @items = qw/foo bar baz qux exit/; +$q->enqueue(@items); +is($q->pending(), scalar(@items), 'Queue count'); + +threads->create(sub { + is($q->pending(), scalar(@items), 'Queue count in thread'); + while (my @el = $q->dequeue_timed(2.5, 2)) { + is($el[0], shift(@items), "Thread got $el[0]"); + if ($el[0] eq 'exit') { + is(scalar(@el), 1, 'Thread to exit'); + } else { + is($el[1], shift(@items), "Thread got $el[1]"); + } + } + is($q->pending(), 0, 'Empty queue'); + $q->enqueue('done'); +})->join(); + +is($q->pending(), 1, 'Queue count after thread'); +is($q->dequeue(), 'done', 'Thread reported done'); +is($q->pending(), 0, 'Empty queue'); + +### ->dequeue_timed(TIMEOUT) test on empty queue ### + +threads->create(sub { + is($q->pending(), 0, 'Empty queue in thread'); + my @el = $q->dequeue_timed(1.5); + is($el[0], undef, "Thread got no items"); + is($q->pending(), 0, 'Empty queue in thread'); + $q->enqueue('done'); +})->join(); + +is($q->pending(), 1, 'Queue count after thread'); +is($q->dequeue(), 'done', 'Thread reported done'); +is($q->pending(), 0, 'Empty queue'); + +exit(0); + +# EOF -- 2.7.4