1MCE::Queue(3) User Contributed Perl Documentation MCE::Queue(3)
2
3
4
6 MCE::Queue - Hybrid (normal and priority) queues
7
9 This document describes MCE::Queue version 1.889
10
12 use MCE;
13 use MCE::Queue;
14
15 my $q = MCE::Queue->new;
16
17 $q->enqueue( qw/ wherefore art thou romeo / );
18
19 my $item = $q->dequeue;
20
21 if ( $q->pending ) {
22 ;
23 }
24
26 This module provides a queue interface supporting normal and priority
27 queues and utilizing the IPC engine behind MCE. Data resides under the
28 manager process. Three options are available for overriding the default
29 value for new queues. The porder option applies to priority queues
30 only.
31
32 use MCE::Queue porder => $MCE::Queue::HIGHEST,
33 type => $MCE::Queue::FIFO;
34
35 use MCE::Queue; # Same as above
36
37 ## Possible values
38
39 porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
40 $MCE::Queue::LOWEST # Lowest priority items dequeue first
41
42 type => $MCE::Queue::FIFO # First in, first out
43 $MCE::Queue::LIFO # Last in, first out
44 $MCE::Queue::LILO # (Synonym for FIFO)
45 $MCE::Queue::FILO # (Synonym for LIFO)
46
48 MCE::Queue provides two run modes.
49
50 (A) The "MCE::Queue" object is constructed before running MCE. The data
51 resides under the manager process. Workers send and request data via
52 IPC.
53
54 (B) Workers might want to construct a queue for local access. In this
55 mode, the data resides under the worker process and not available to
56 other workers including the manager process.
57
58 use MCE;
59 use MCE::Queue;
60
61 my $F = MCE::Queue->new( fast => 1 );
62 my $consumers = 8;
63
64 my $mce = MCE->new(
65
66 task_end => sub {
67 my ($mce, $task_id, $task_name) = @_;
68 $F->end() if $task_name eq 'dir';
69 },
70
71 user_tasks => [{
72 max_workers => 1, task_name => 'dir',
73
74 user_func => sub {
75 ## Create a "standalone queue" only accessible to this worker.
76 my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
77
78 while (defined (my $dir = $D->dequeue_nb)) {
79 my (@files, @dirs); foreach (glob("$dir/*")) {
80 if (-d $_) { push @dirs, $_; next; }
81 push @files, $_;
82 }
83 $D->enqueue(@dirs ) if scalar @dirs;
84 $F->enqueue(@files) if scalar @files;
85 }
86 }
87 },{
88 max_workers => $consumers, task_name => 'file',
89
90 user_func => sub {
91 while (defined (my $file = $F->dequeue)) {
92 MCE->say($file);
93 }
94 }
95 }]
96
97 )->run({ user_args => [ $ARGV[0] || '.' ] });
98
99 __END__
100
101 Results taken from files_mce.pl and files_thr.pl on the web.
102 https://github.com/marioroy/mce-examples/tree/master/other
103
104 Usage:
105 time ./files_mce.pl /usr 0 | wc -l
106 time ./files_mce.pl /usr 1 | wc -l
107 time ./files_thr.pl /usr | wc -l
108
109 Darwin (OS) /usr: 216,271 files
110 MCE::Queue, fast => 0 : 4.17s
111 MCE::Queue, fast => 1 : 2.62s
112 Thread::Queue : 4.14s
113
114 Linux (VM) /usr: 186,154 files
115 MCE::Queue, fast => 0 : 12.57s
116 MCE::Queue, fast => 1 : 3.36s
117 Thread::Queue : 5.91s
118
119 Solaris (VM) /usr: 603,051 files
120 MCE::Queue, fast => 0 : 39.04s
121 MCE::Queue, fast => 1 : 18.08s
122 Thread::Queue * Perl not built to support threads
123
125 MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
126 This creates a new queue. Available options are queue, porder, type,
127 await, and gather. Note: The barrier and fast options are silentently
128 ignored (no-op) if specified; starting with 1.867.
129
130 use MCE;
131 use MCE::Queue;
132
133 my $q1 = MCE::Queue->new();
134 my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );
135
136 my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
137 my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
138
139 my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
140 my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );
141
142 my $q7 = MCE::Queue->new( await => 1, barrier => 0 );
143 my $q8 = MCE::Queue->new( fast => 1 );
144
145 The "await" option, when enabled, allows workers to block (semaphore-
146 like) until the number of items pending is equal to or less than a
147 threshold value. The $q->await method is described below.
148
149 Obsolete: On Unix platforms, "barrier" mode (enabled by default)
150 prevents many workers from dequeuing simultaneously to lessen overhead
151 for the OS kernel. Specify 0 to disable barrier mode and not allocate
152 sockets. The barrier option has no effect if constructing the queue
153 inside a thread or enabling "fast".
154
155 Obsolete: The "fast" option speeds up dequeues and is not enabled by
156 default. It is beneficial for queues not calling (->dequeue_nb) and
157 not altering the count value while running; e.g. ->dequeue($count).
158
159 The "gather" option is mainly for running with MCE and wanting to pass
160 item(s) to a callback function for appending to the queue. Multiple
161 queues may point to the same callback function. The callback receives
162 the queue object as the first argument and items after it.
163
164 sub _append {
165 my ($q, @items) = @_;
166 $q->enqueue(@items);
167 }
168
169 my $q7 = MCE::Queue->new( gather => \&_append );
170 my $q8 = MCE::Queue->new( gather => \&_append );
171
172 ## Items are diverted to the callback function, not the queue.
173 $q7->enqueue( 'apple', 'orange' );
174
175 Specifying the "gather" option allows one to store items temporarily
176 while ensuring output order. Although a queue object is not required,
177 this is simply a demonstration of the gather option in the context of a
178 queue.
179
180 use MCE;
181 use MCE::Queue;
182
183 sub preserve_order {
184 my %tmp; my $order_id = 1;
185
186 return sub {
187 my ($q, $chunk_id, $data) = @_;
188 $tmp{$chunk_id} = $data;
189
190 while (1) {
191 last unless exists $tmp{$order_id};
192 $q->enqueue( delete $tmp{$order_id++} );
193 }
194
195 return;
196 };
197 }
198
199 my @squares; my $q = MCE::Queue->new(
200 queue => \@squares, gather => preserve_order
201 );
202
203 my $mce = MCE->new(
204 chunk_size => 1, input_data => [ 1 .. 100 ],
205 user_func => sub {
206 $q->enqueue( MCE->chunk_id, $_ * $_ );
207 }
208 );
209
210 $mce->run;
211
212 print "@squares\n";
213
214 $q->await ( $pending_threshold )
215 The await method is beneficial when wanting to throttle worker(s)
216 appending to the queue. Perhaps, consumers are running a bit behind and
217 wanting to keep tabs on memory consumption. Below, the number of items
218 pending will never go above 20.
219
220 use Time::HiRes qw( sleep );
221
222 use MCE::Flow;
223 use MCE::Queue;
224
225 my $q = MCE::Queue->new( await => 1, fast => 1 );
226 my ( $producers, $consumers ) = ( 1, 8 );
227
228 mce_flow {
229 task_name => [ 'producer', 'consumer' ],
230 max_workers => [ $producers, $consumers ],
231 },
232 sub {
233 ## producer
234 for my $item ( 1 .. 100 ) {
235 $q->enqueue($item);
236
237 ## blocks until the # of items pending reaches <= 10
238 if ($item % 10 == 0) {
239 MCE->say( 'pending: '.$q->pending() );
240 $q->await(10);
241 }
242 }
243
244 ## notify consumers no more work
245 $q->end();
246
247 },
248 sub {
249 ## consumers
250 while (defined (my $next = $q->dequeue())) {
251 MCE->say( MCE->task_wid().': '.$next );
252 sleep 0.100;
253 }
254 };
255
256 $q->clear ( void )
257 Clears the queue of any items. This has the effect of nulling the queue
258 and the socket used for blocking.
259
260 my @a; my $q = MCE::Queue->new( queue => \@a );
261
262 @a = (); ## bad, the blocking socket may become out of sync
263 $q->clear; ## ok
264
265 $q->end ( void )
266 Stops the queue from receiving more items. Any worker blocking on
267 "dequeue" will be unblocked automatically. Subsequent calls to
268 "dequeue" will behave like "dequeue_nb". Current API available since
269 MCE 1.818.
270
271 $q->end();
272
273 MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
274 might want to enqueue "undef"'s versus calling "end". The number of
275 "undef"'s depends on how many items workers dequeue at a time.
276
277 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
278 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
279 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
280
281 $q->enqueue ( $item [, $item, ... ] )
282 Appends a list of items onto the end of the normal queue.
283
284 $q->enqueue( 'foo' );
285 $q->enqueue( 'bar', 'baz' );
286
287 $q->enqueuep ( $p, $item [, $item, ... ] )
288 Appends a list of items onto the end of the priority queue with
289 priority.
290
291 $q->enqueue( $priority, 'foo' );
292 $q->enqueue( $priority, 'bar', 'baz' );
293
294 $q->dequeue ( [ $count ] )
295 Returns the requested number of items (default 1) from the queue.
296 Priority data will always dequeue first before any data from the normal
297 queue.
298
299 $q->dequeue;
300 $q->dequeue( 2 );
301
302 The method will block if the queue contains zero items. If the queue
303 contains fewer than the requested number of items, the method will not
304 block, but return whatever items there are on the queue.
305
306 The $count, used for requesting the number of items, is beneficial when
307 workers are passing parameters through the queue. For this reason,
308 always remember to dequeue using the same multiple for the count. This
309 is unlike Thread::Queue which will block until the requested number of
310 items are available.
311
312 # MCE::Queue 1.820 and prior releases
313 while ( my @items = $q->dequeue(2) ) {
314 last unless ( defined $items[0] );
315 ...
316 }
317
318 # MCE::Queue 1.821 and later
319 while ( my @items = $q->dequeue(2) ) {
320 ...
321 }
322
323 $q->dequeue_nb ( [ $count ] )
324 Returns the requested number of items (default 1) from the queue. Like
325 with dequeue, priority data will always dequeue first. This method is
326 non-blocking and returns "undef" in the absence of data.
327
328 $q->dequeue_nb;
329 $q->dequeue_nb( 2 );
330
331 $q->dequeue_timed ( timeout [, $count ] )
332 Returns the requested number of items (default 1) from the queue. Like
333 with dequeue, priority data will always dequeue first. This method is
334 blocking until the timeout is reached and returns "undef" in the
335 absence of data. Current API available since MCE 1.886.
336
337 $q->dequeue_timed( 300 ); # timeout after 5 minutes
338 $q->dequeue_timed( 300, 2 );
339
340 The timeout may be specified as fractional seconds. If timeout is
341 missing, undef, less than or equal to 0, or called by the manager
342 process, then this call behaves like dequeue_nb.
343
344 $q->insert ( $index, $item [, $item, ... ] )
345 Adds the list of items to the queue at the specified index position (0
346 is the head of the list). The head of the queue is that item which
347 would be removed by a call to dequeue.
348
349 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
350 $q->enqueue(1, 2, 3, 4);
351 $q->insert(1, 'foo', 'bar');
352 # Queue now contains: 1, foo, bar, 2, 3, 4
353
354 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
355 $q->enqueue(1, 2, 3, 4);
356 $q->insert(1, 'foo', 'bar');
357 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
358
359 $q->insertp ( $p, $index, $item [, $item, ... ] )
360 Adds the list of items to the queue at the specified index position
361 with priority. The behavior is similarly to "$q->insert" otherwise.
362
363 $q->pending ( void )
364 Returns the number of items in the queue. The count includes both
365 normal and priority data. Returns "undef" if the queue has been ended,
366 and there are no more items in the queue.
367
368 $q = MCE::Queue->new();
369 $q->enqueuep(5, 'foo', 'bar');
370 $q->enqueue('sunny', 'day');
371
372 print $q->pending(), "\n";
373 # Output: 4
374
375 $q->peek ( [ $index ] )
376 Returns an item from the normal queue, at the specified index, without
377 dequeuing anything. It defaults to the head of the queue if index is
378 not specified. The head of the queue is that item which would be
379 removed by a call to dequeue. Negative index values are supported,
380 similarly to arrays.
381
382 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
383 $q->enqueue(1, 2, 3, 4, 5);
384
385 print $q->peek(1), ' ', $q->peek(-2), "\n";
386 # Output: 2 4
387
388 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
389 $q->enqueue(1, 2, 3, 4, 5);
390
391 print $q->peek(1), ' ', $q->peek(-2), "\n";
392 # Output: 4 2
393
394 $q->peekp ( $p [, $index ] )
395 Returns an item from the queue with priority, at the specified index,
396 without dequeuing anything. It defaults to the head of the queue if
397 index is not specified. The behavior is similarly to "$q->peek"
398 otherwise.
399
400 $q->peekh ( [ $index ] )
401 Returns an item from the head of the heap or at the specified index.
402
403 $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
404 $q->enqueuep(5, 'foo');
405 $q->enqueuep(6, 'bar');
406 $q->enqueuep(4, 'sun');
407
408 print $q->peekh(0), "\n";
409 # Output: 6
410
411 $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
412 $q->enqueuep(5, 'foo');
413 $q->enqueuep(6, 'bar');
414 $q->enqueuep(4, 'sun');
415
416 print $q->peekh(0), "\n";
417 # Output: 4
418
419 $q->heap ( void )
420 Returns an array containing the heap data. Heap data consists of
421 priority numbers, not the data.
422
423 @h = $q->heap; # $MCE::Queue::HIGHEST
424 # Heap contains: 6, 5, 4
425
426 @h = $q->heap; # $MCE::Queue::LOWEST
427 # Heap contains: 4, 5, 6
428
430 • List::BinarySearch
431
432 The bsearch_num_pos method was helpful for accommodating the highest
433 and lowest order in MCE::Queue.
434
435 • POE::Queue::Array
436
437 For extra optimization, two if statements were adopted for checking
438 if the item belongs at the end or head of the queue.
439
440 • List::Priority
441
442 MCE::Queue supports both normal and priority queues.
443
444 • Thread::Queue
445
446 Thread::Queue is used as a template for identifying and documenting
447 the methods.
448
449 MCE::Queue is not fully compatible due to supporting normal and
450 priority queues simultaneously; e.g.
451
452 $q->enqueue( $item [, $item, ... ] ); # normal queue
453 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
454
455 $q->dequeue( [ $count ] ); # priority data dequeues first
456 $q->dequeue_nb( [ $count ] );
457
458 $q->pending(); # counts both normal/priority queues
459
460 • Parallel::DataPipe
461
462 The recursion example, in the synopsis above, was largely adopted
463 from this module.
464
466 MCE, MCE::Core
467
469 Mario E. Roy, <marioeroy AT gmail DOT com>
470
471
472
473perl v5.38.0 2023-09-14 MCE::Queue(3)