1Thread::Queue(3) User Contributed Perl Documentation Thread::Queue(3)
2
3
4
6 Thread::Queue - Thread-safe queues
7
9 This document describes Thread::Queue version 3.13
10
12 use strict;
13 use warnings;
14
15 use threads;
16 use Thread::Queue;
17
18 my $q = Thread::Queue->new(); # A new empty queue
19
20 # Worker thread
21 my $thr = threads->create(
22 sub {
23 # Thread will loop until no more work
24 while (defined(my $item = $q->dequeue())) {
25 # Do work on $item
26 ...
27 }
28 }
29 );
30
31 # Send work to the thread
32 $q->enqueue($item1, ...);
33 # Signal that there is no more work to be sent
34 $q->end();
35 # Join up with the thread when it finishes
36 $thr->join();
37
38 ...
39
40 # Count of items in the queue
41 my $left = $q->pending();
42
43 # Non-blocking dequeue
44 if (defined(my $item = $q->dequeue_nb())) {
45 # Work on $item
46 }
47
48 # Blocking dequeue with 5-second timeout
49 if (defined(my $item = $q->dequeue_timed(5))) {
50 # Work on $item
51 }
52
53 # Set a size for a queue
54 $q->limit = 5;
55
56 # Get the second item in the queue without dequeuing anything
57 my $item = $q->peek(1);
58
59 # Insert two items into the queue just behind the head
60 $q->insert(1, $item1, $item2);
61
62 # Extract the last two items on the queue
63 my ($item1, $item2) = $q->extract(-2, 2);
64
66 This module provides thread-safe FIFO queues that can be accessed
67 safely by any number of threads.
68
69 Any data types supported by threads::shared can be passed via queues:
70
71 Ordinary scalars
72 Array refs
73 Hash refs
74 Scalar refs
75 Objects based on the above
76
77 Ordinary scalars are added to queues as they are.
78
79 If not already thread-shared, the other complex data types will be
80 cloned (recursively, if needed, and including any "bless"ings and read-
81 only settings) into thread-shared structures before being placed onto a
82 queue.
83
84 For example, the following would cause Thread::Queue to create a empty,
85 shared array reference via "&shared([])", copy the elements 'foo',
86 'bar' and 'baz' from @ary into it, and then place that shared reference
87 onto the queue:
88
89 my @ary = qw/foo bar baz/;
90 $q->enqueue(\@ary);
91
92 However, for the following, the items are already shared, so their
93 references are added directly to the queue, and no cloning takes place:
94
95 my @ary :shared = qw/foo bar baz/;
96 $q->enqueue(\@ary);
97
98 my $obj = &shared({});
99 $$obj{'foo'} = 'bar';
100 $$obj{'qux'} = 99;
101 bless($obj, 'My::Class');
102 $q->enqueue($obj);
103
104 See "LIMITATIONS" for caveats related to passing objects via queues.
105
107 ->new()
108 Creates a new empty queue.
109
110 ->new(LIST)
111 Creates a new queue pre-populated with the provided list of items.
112
114 The following methods deal with queues on a FIFO basis.
115
116 ->enqueue(LIST)
117 Adds a list of items onto the end of the queue.
118
119 ->dequeue()
120 ->dequeue(COUNT)
121 Removes the requested number of items (default is 1) from the head
122 of the queue, and returns them. If the queue contains fewer than
123 the requested number of items, then the thread will be blocked
124 until the requisite number of items are available (i.e., until
125 other threads "enqueue" more items).
126
127 ->dequeue_nb()
128 ->dequeue_nb(COUNT)
129 Removes the requested number of items (default is 1) from the head
130 of the queue, and returns them. If the queue contains fewer than
131 the requested number of items, then it immediately (i.e., non-
132 blocking) returns whatever items there are on the queue. If the
133 queue is empty, then "undef" is returned.
134
135 ->dequeue_timed(TIMEOUT)
136 ->dequeue_timed(TIMEOUT, COUNT)
137 Removes the requested number of items (default is 1) from the head
138 of the queue, and returns them. If the queue contains fewer than
139 the requested number of items, then the thread will be blocked
140 until the requisite number of items are available, or until the
141 timeout is reached. If the timeout is reached, it returns whatever
142 items there are on the queue, or "undef" if the queue is empty.
143
144 The timeout may be a number of seconds relative to the current time
145 (e.g., 5 seconds from when the call is made), or may be an absolute
146 timeout in epoch seconds the same as would be used with
147 cond_timedwait(). Fractional seconds (e.g., 2.5 seconds) are also
148 supported (to the extent of the underlying implementation).
149
150 If "TIMEOUT" is missing, "undef", or less than or equal to 0, then
151 this call behaves the same as "dequeue_nb".
152
153 ->pending()
154 Returns the number of items still in the queue. Returns "undef" if
155 the queue has been ended (see below), and there are no more items
156 in the queue.
157
158 ->limit
159 Sets the size of the queue. If set, calls to "enqueue()" will
160 block until the number of pending items in the queue drops below
161 the "limit". The "limit" does not prevent enqueuing items beyond
162 that count:
163
164 my $q = Thread::Queue->new(1, 2);
165 $q->limit = 4;
166 $q->enqueue(3, 4, 5); # Does not block
167 $q->enqueue(6); # Blocks until at least 2 items are
168 # dequeued
169 my $size = $q->limit; # Returns the current limit (may return
170 # 'undef')
171 $q->limit = 0; # Queue size is now unlimited
172
173 Calling any of the dequeue methods with "COUNT" greater than a
174 queue's "limit" will generate an error.
175
176 ->end()
177 Declares that no more items will be added to the queue.
178
179 All threads blocking on "dequeue()" calls will be unblocked with
180 any remaining items in the queue and/or "undef" being returned.
181 Any subsequent calls to "dequeue()" will behave like
182 "dequeue_nb()".
183
184 Once ended, no more items may be placed in the queue.
185
187 The following methods can be used to manipulate items anywhere in a
188 queue.
189
190 To prevent the contents of a queue from being modified by another
191 thread while it is being examined and/or changed, lock the queue inside
192 a local block:
193
194 {
195 lock($q); # Keep other threads from changing the queue's contents
196 my $item = $q->peek();
197 if ($item ...) {
198 ...
199 }
200 }
201 # Queue is now unlocked
202
203 ->peek()
204 ->peek(INDEX)
205 Returns an item from the queue without dequeuing anything.
206 Defaults to the the head of queue (at index position 0) if no index
207 is specified. Negative index values are supported as with arrays
208 (i.e., -1 is the end of the queue, -2 is next to last, and so on).
209
210 If no items exists at the specified index (i.e., the queue is
211 empty, or the index is beyond the number of items on the queue),
212 then "undef" is returned.
213
214 Remember, the returned item is not removed from the queue, so
215 manipulating a "peek"ed at reference affects the item on the queue.
216
217 ->insert(INDEX, LIST)
218 Adds the list of items to the queue at the specified index position
219 (0 is the head of the list). Any existing items at and beyond that
220 position are pushed back past the newly added items:
221
222 $q->enqueue(1, 2, 3, 4);
223 $q->insert(1, qw/foo bar/);
224 # Queue now contains: 1, foo, bar, 2, 3, 4
225
226 Specifying an index position greater than the number of items in
227 the queue just adds the list to the end.
228
229 Negative index positions are supported:
230
231 $q->enqueue(1, 2, 3, 4);
232 $q->insert(-2, qw/foo bar/);
233 # Queue now contains: 1, 2, foo, bar, 3, 4
234
235 Specifying a negative index position greater than the number of
236 items in the queue adds the list to the head of the queue.
237
238 ->extract()
239 ->extract(INDEX)
240 ->extract(INDEX, COUNT)
241 Removes and returns the specified number of items (defaults to 1)
242 from the specified index position in the queue (0 is the head of
243 the queue). When called with no arguments, "extract" operates the
244 same as "dequeue_nb".
245
246 This method is non-blocking, and will return only as many items as
247 are available to fulfill the request:
248
249 $q->enqueue(1, 2, 3, 4);
250 my $item = $q->extract(2) # Returns 3
251 # Queue now contains: 1, 2, 4
252 my @items = $q->extract(1, 3) # Returns (2, 4)
253 # Queue now contains: 1
254
255 Specifying an index position greater than the number of items in
256 the queue results in "undef" or an empty list being returned.
257
258 $q->enqueue('foo');
259 my $nada = $q->extract(3) # Returns undef
260 my @nada = $q->extract(1, 3) # Returns ()
261
262 Negative index positions are supported. Specifying a negative
263 index position greater than the number of items in the queue may
264 return items from the head of the queue (similar to "dequeue_nb")
265 if the count overlaps the head of the queue from the specified
266 position (i.e. if queue size + index + count is greater than zero):
267
268 $q->enqueue(qw/foo bar baz/);
269 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
270 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
271 # Queue now contains: bar, baz
272 my @rest = $q->extract(-3, 4); # Returns (bar, baz) -
273 # (2+(-3)+4) > 0
274
276 Queues created by Thread::Queue can be used in both threaded and non-
277 threaded applications.
278
280 Passing objects on queues may not work if the objects' classes do not
281 support sharing. See "BUGS AND LIMITATIONS" in threads::shared for
282 more.
283
284 Passing array/hash refs that contain objects may not work for Perl
285 prior to 5.10.0.
286
288 Thread::Queue on MetaCPAN: <https://metacpan.org/release/Thread-Queue>
289
290 Code repository for CPAN distribution:
291 <https://github.com/Dual-Life/Thread-Queue>
292
293 threads, threads::shared
294
295 Sample code in the examples directory of this distribution on CPAN.
296
298 Jerry D. Hedden, <jdhedden AT cpan DOT org>
299
301 This program is free software; you can redistribute it and/or modify it
302 under the same terms as Perl itself.
303
304
305
306perl v5.30.0 2019-07-26 Thread::Queue(3)