[perl #116865] Upgrade to Thread::Queue 3.02
authorJerry D. Hedden <jdhedden@cpan.org>
Tue, 19 Feb 2013 20:23:46 +0000 (12:23 -0800)
committerChris 'BinGOs' Williams <chris@bingosnet.co.uk>
Tue, 19 Feb 2013 22:21:01 +0000 (22:21 +0000)
Attached patch updates Thread::Queue to v3.0.2 in preparation for a
CPAN release.

Adds a new method to Thread::Queue to dequeue items with a timeout
feature.  This addition was suggested by Andreas Huber.

Signed-off-by: Chris 'BinGOs' Williams <chris@bingosnet.co.uk>
MANIFEST
Porting/Maintainers.pl
dist/Thread-Queue/lib/Thread/Queue.pm
dist/Thread-Queue/t/10_timed.t [new file with mode: 0644]

index a941a01..b1090bc 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -3565,6 +3565,7 @@ dist/Thread-Queue/t/06_insert.t           Thread::Queue tests
 dist/Thread-Queue/t/07_lock.t          Thread::Queue tests
 dist/Thread-Queue/t/08_nothreads.t     Thread::Queue tests
 dist/Thread-Queue/t/09_ended.t         Thread::Queue tests
+dist/Thread-Queue/t/10_timed.t Thread::Queue tests
 dist/Thread-Semaphore/lib/Thread/Semaphore.pm  Thread-safe semaphores
 dist/Thread-Semaphore/t/01_basic.t             Thread::Semaphore tests
 dist/Thread-Semaphore/t/02_errs.t              Thread::Semaphore tests
index 0bcda65..5ede296 100755 (executable)
@@ -1872,7 +1872,7 @@ use File::Glob qw(:case);
 
     'Thread::Queue' => {
         'MAINTAINER'   => 'jdhedden',
-        'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.01.tar.gz',
+        'DISTRIBUTION' => 'JDHEDDEN/Thread-Queue-3.02.tar.gz',
         'FILES'        => q[dist/Thread-Queue],
         'EXCLUDED'     => [
             qr{^examples/},
index 0bf1624..027dd56 100644 (file)
@@ -3,7 +3,7 @@ package Thread::Queue;
 use strict;
 use warnings;
 
-our $VERSION = '3.01';
+our $VERSION = '3.02';
 $VERSION = eval $VERSION;
 
 use threads::shared 1.21;
@@ -13,7 +13,7 @@ use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
 our @CARP_NOT = ("threads::shared");
 
 # Predeclarations for internal functions
-my ($validate_count, $validate_index);
+my ($validate_count, $validate_index, $validate_timeout);
 
 # Create a new queue possibly pre-populated with items
 sub new
@@ -103,6 +103,32 @@ sub dequeue_nb
     return @items;
 }
 
+# Return items from the head of a queue, blocking if needed up to a timeout
+sub dequeue_timed
+{
+    my $self = shift;
+    lock(%$self);
+    my $queue = $$self{'queue'};
+
+    # Timeout may be relative or absolute
+    my $timeout = @_ ? $validate_timeout->(shift) : -1;
+    # Convert to an absolute time for use with cond_timedwait()
+    if ($timeout < 32000000) {   # More than one year
+        $timeout += time();
+    }
+
+    my $count = @_ ? $validate_count->(shift) : 1;
+
+    # Wait for requisite number of items, or until timeout
+    while ((@$queue < $count) && ! $$self{'ENDED'}) {
+        last if (! cond_timedwait(%$self, $timeout));
+    }
+    cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+    # Get whatever we need off the queue if available
+    return $self->dequeue_nb($count);
+}
+
 # Return an item without removing it from a queue
 sub peek
 {
@@ -232,6 +258,23 @@ $validate_count = sub {
     return $count;
 };
 
+# Check value of the requested timeout
+$validate_timeout = sub {
+    my $timeout = shift;
+
+    if (! defined($timeout) ||
+        ! looks_like_number($timeout))
+    {
+        require Carp;
+        my ($method) = (caller(1))[3];
+        $method =~ s/Thread::Queue:://;
+        $timeout = 'undef' if (! defined($timeout));
+        Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
+    }
+
+    return $timeout;
+};
+
 1;
 
 =head1 NAME
@@ -240,7 +283,7 @@ Thread::Queue - Thread-safe queues
 
 =head1 VERSION
 
-This document describes Thread::Queue version 3.01
+This document describes Thread::Queue version 3.02
 
 =head1 SYNOPSIS
 
@@ -280,6 +323,11 @@ This document describes Thread::Queue version 3.01
         # Work on $item
     }
 
+    # Blocking dequeue with 5-second timeout
+    if (defined(my $item = $q->dequeue_timed(5))) {
+        # Work on $item
+    }
+
     # Get the second item in the queue without dequeuing anything
     my $item = $q->peek(1);
 
@@ -381,6 +429,27 @@ number of items, then it immediately (i.e., non-blocking) returns whatever
 items there are on the queue.  If the queue is empty, then C<undef> is
 returned.
 
+=item ->dequeue_timed(TIMEOUT)
+
+=item ->dequeue_timed(TIMEOUT, COUNT)
+
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them.  If the queue contains fewer than the requested
+number of items, then the thread will be blocked until the requisite number of
+items are available, or until the timeout is reached.  If the timeout is
+reached, it returns whatever items there are on the queue, or C<undef> if the
+queue is empty.
+
+The timeout may be a number of seconds relative to the current time (e.g., 5
+seconds from when the call is made), or may be an absolute timeout in I<epoch>
+seconds the same as would be used with
+L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
+Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
+the underlying implementation).
+
+If C<TIMEOUT> is missing, c<undef>, or less than or equal to 0, then this call
+behaves the same as C<dequeue_nb>.
+
 =item ->pending()
 
 Returns the number of items still in the queue.  Returns C<undef> if the queue
diff --git a/dist/Thread-Queue/t/10_timed.t b/dist/Thread-Queue/t/10_timed.t
new file mode 100644 (file)
index 0000000..8404720
--- /dev/null
@@ -0,0 +1,66 @@
+use strict;
+use warnings;
+
+BEGIN {
+    use Config;
+    if (! $Config{'useithreads'}) {
+        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+        exit(0);
+    }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+    require 't/test.pl';   # Test::More work-alike for Perl 5.8.0
+} else {
+    require Test::More;
+}
+Test::More->import();
+plan('tests' => 19);
+
+### ->dequeue_timed(TIMEOUT, COUNT) test ###
+
+my $q = Thread::Queue->new();
+ok($q, 'New queue');
+
+my @items = qw/foo bar baz qux exit/;
+$q->enqueue(@items);
+is($q->pending(), scalar(@items), 'Queue count');
+
+threads->create(sub {
+    is($q->pending(), scalar(@items), 'Queue count in thread');
+    while (my @el = $q->dequeue_timed(2.5, 2)) {
+        is($el[0], shift(@items), "Thread got $el[0]");
+        if ($el[0] eq 'exit') {
+            is(scalar(@el), 1, 'Thread to exit');
+        } else {
+            is($el[1], shift(@items), "Thread got $el[1]");
+        }
+    }
+    is($q->pending(), 0, 'Empty queue');
+    $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+### ->dequeue_timed(TIMEOUT) test on empty queue ###
+
+threads->create(sub {
+    is($q->pending(), 0, 'Empty queue in thread');
+    my @el = $q->dequeue_timed(1.5);
+    is($el[0], undef, "Thread got no items");
+    is($q->pending(), 0, 'Empty queue in thread');
+    $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+exit(0);
+
+# EOF