use strict;
use warnings;
-our $VERSION = '3.01';
+our $VERSION = '3.02';
$VERSION = eval $VERSION;
use threads::shared 1.21;
our @CARP_NOT = ("threads::shared");
# Predeclarations for internal functions
-my ($validate_count, $validate_index);
+my ($validate_count, $validate_index, $validate_timeout);
# Create a new queue possibly pre-populated with items
sub new
return @items;
}
+# Return items from the head of a queue, blocking if needed up to a timeout
+sub dequeue_timed
+{
+ my $self = shift;
+ lock(%$self);
+ my $queue = $$self{'queue'};
+
+ # Timeout may be relative or absolute
+ my $timeout = @_ ? $validate_timeout->(shift) : -1;
+ # Convert to an absolute time for use with cond_timedwait()
+ if ($timeout < 32000000) { # More than one year
+ $timeout += time();
+ }
+
+ my $count = @_ ? $validate_count->(shift) : 1;
+
+ # Wait for requisite number of items, or until timeout
+ while ((@$queue < $count) && ! $$self{'ENDED'}) {
+ last if (! cond_timedwait(%$self, $timeout));
+ }
+ cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
+
+ # Get whatever we need off the queue if available
+ return $self->dequeue_nb($count);
+}
+
# Return an item without removing it from a queue
sub peek
{
return $count;
};
+# Check value of the requested timeout
+$validate_timeout = sub {
+ my $timeout = shift;
+
+ if (! defined($timeout) ||
+ ! looks_like_number($timeout))
+ {
+ require Carp;
+ my ($method) = (caller(1))[3];
+ $method =~ s/Thread::Queue:://;
+ $timeout = 'undef' if (! defined($timeout));
+ Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
+ }
+
+ return $timeout;
+};
+
1;
=head1 NAME
=head1 VERSION
-This document describes Thread::Queue version 3.01
+This document describes Thread::Queue version 3.02
=head1 SYNOPSIS
# Work on $item
}
+ # Blocking dequeue with 5-second timeout
+ if (defined(my $item = $q->dequeue_timed(5))) {
+ # Work on $item
+ }
+
# Get the second item in the queue without dequeuing anything
my $item = $q->peek(1);
items there are on the queue. If the queue is empty, then C<undef> is
returned.
+=item ->dequeue_timed(TIMEOUT)
+
+=item ->dequeue_timed(TIMEOUT, COUNT)
+
+Removes the requested number of items (default is 1) from the head of the
+queue, and returns them. If the queue contains fewer than the requested
+number of items, then the thread will be blocked until the requisite number of
+items are available, or until the timeout is reached. If the timeout is
+reached, it returns whatever items there are on the queue, or C<undef> if the
+queue is empty.
+
+The timeout may be a number of seconds relative to the current time (e.g., 5
+seconds from when the call is made), or may be an absolute timeout in I<epoch>
+seconds the same as would be used with
+L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
+Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
+the underlying implementation).
+
+If C<TIMEOUT> is missing, c<undef>, or less than or equal to 0, then this call
+behaves the same as C<dequeue_nb>.
+
=item ->pending()
Returns the number of items still in the queue. Returns C<undef> if the queue
--- /dev/null
+use strict;
+use warnings;
+
+BEGIN {
+ use Config;
+ if (! $Config{'useithreads'}) {
+ print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
+ exit(0);
+ }
+}
+
+use threads;
+use Thread::Queue;
+
+if ($] == 5.008) {
+ require 't/test.pl'; # Test::More work-alike for Perl 5.8.0
+} else {
+ require Test::More;
+}
+Test::More->import();
+plan('tests' => 19);
+
+### ->dequeue_timed(TIMEOUT, COUNT) test ###
+
+my $q = Thread::Queue->new();
+ok($q, 'New queue');
+
+my @items = qw/foo bar baz qux exit/;
+$q->enqueue(@items);
+is($q->pending(), scalar(@items), 'Queue count');
+
+threads->create(sub {
+ is($q->pending(), scalar(@items), 'Queue count in thread');
+ while (my @el = $q->dequeue_timed(2.5, 2)) {
+ is($el[0], shift(@items), "Thread got $el[0]");
+ if ($el[0] eq 'exit') {
+ is(scalar(@el), 1, 'Thread to exit');
+ } else {
+ is($el[1], shift(@items), "Thread got $el[1]");
+ }
+ }
+ is($q->pending(), 0, 'Empty queue');
+ $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+### ->dequeue_timed(TIMEOUT) test on empty queue ###
+
+threads->create(sub {
+ is($q->pending(), 0, 'Empty queue in thread');
+ my @el = $q->dequeue_timed(1.5);
+ is($el[0], undef, "Thread got no items");
+ is($q->pending(), 0, 'Empty queue in thread');
+ $q->enqueue('done');
+})->join();
+
+is($q->pending(), 1, 'Queue count after thread');
+is($q->dequeue(), 'done', 'Thread reported done');
+is($q->pending(), 0, 'Empty queue');
+
+exit(0);
+
+# EOF