1MCE::Queue(3) User Contributed Perl Documentation MCE::Queue(3)
2
3
4
6 MCE::Queue - Hybrid (normal and priority) queues
7
9 This document describes MCE::Queue version 1.866
10
12 use MCE;
13 use MCE::Queue;
14
15 my $q = MCE::Queue->new;
16
17 $q->enqueue( qw/ wherefore art thou romeo / );
18
19 my $item = $q->dequeue;
20
21 if ( $q->pending ) {
22 ;
23 }
24
26 This module provides a queue interface supporting normal and priority
27 queues and utilizing the IPC engine behind MCE. Data resides under the
28 manager process. Three options are available for overriding the default
29 value for new queues. The porder option applies to priority queues
30 only.
31
32 use MCE::Queue porder => $MCE::Queue::HIGHEST,
33 type => $MCE::Queue::FIFO,
34 fast => 0;
35
36 use MCE::Queue; # Same as above
37
38 ## Possible values
39
40 porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
41 $MCE::Queue::LOWEST # Lowest priority items dequeue first
42
43 type => $MCE::Queue::FIFO # First in, first out
44 $MCE::Queue::LIFO # Last in, first out
45 $MCE::Queue::LILO # (Synonym for FIFO)
46 $MCE::Queue::FILO # (Synonym for LIFO)
47
49 MCE::Queue provides two run modes.
50
51 (A) The "MCE::Queue" object is constructed before running MCE. The data
52 resides under the manager process. Workers send and request data via
53 IPC.
54
55 (B) Workers might want to construct a queue for local access. In this
56 mode, the data resides under the worker process and not available to
57 other workers including the manager process.
58
59 use MCE;
60 use MCE::Queue;
61
62 my $F = MCE::Queue->new( fast => 1 );
63 my $consumers = 8;
64
65 my $mce = MCE->new(
66
67 task_end => sub {
68 my ($mce, $task_id, $task_name) = @_;
69 $F->end() if $task_name eq 'dir';
70 },
71
72 user_tasks => [{
73 max_workers => 1, task_name => 'dir',
74
75 user_func => sub {
76 ## Create a "standalone queue" only accessible to this worker.
77 my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
78
79 while (defined (my $dir = $D->dequeue_nb)) {
80 my (@files, @dirs); foreach (glob("$dir/*")) {
81 if (-d $_) { push @dirs, $_; next; }
82 push @files, $_;
83 }
84 $D->enqueue(@dirs ) if scalar @dirs;
85 $F->enqueue(@files) if scalar @files;
86 }
87 }
88 },{
89 max_workers => $consumers, task_name => 'file',
90
91 user_func => sub {
92 while (defined (my $file = $F->dequeue)) {
93 MCE->say($file);
94 }
95 }
96 }]
97
98 )->run({ user_args => [ $ARGV[0] || '.' ] });
99
100 __END__
101
102 Results taken from files_mce.pl and files_thr.pl on the web.
103 https://github.com/marioroy/mce-examples/tree/master/other
104
105 Usage:
106 time ./files_mce.pl /usr 0 | wc -l
107 time ./files_mce.pl /usr 1 | wc -l
108 time ./files_thr.pl /usr | wc -l
109
110 Darwin (OS) /usr: 216,271 files
111 MCE::Queue, fast => 0 : 4.17s
112 MCE::Queue, fast => 1 : 2.62s
113 Thread::Queue : 4.14s
114
115 Linux (VM) /usr: 186,154 files
116 MCE::Queue, fast => 0 : 12.57s
117 MCE::Queue, fast => 1 : 3.36s
118 Thread::Queue : 5.91s
119
120 Solaris (VM) /usr: 603,051 files
121 MCE::Queue, fast => 0 : 39.04s
122 MCE::Queue, fast => 1 : 18.08s
123 Thread::Queue * Perl not built to support threads
124
126 MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
127 This creates a new queue. Available options are queue, porder, type,
128 await, barrier, fast, and gather.
129
130 use MCE;
131 use MCE::Queue;
132
133 my $q1 = MCE::Queue->new();
134 my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );
135
136 my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
137 my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
138
139 my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
140 my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );
141
142 my $q7 = MCE::Queue->new( await => 1, barrier => 0 );
143 my $q8 = MCE::Queue->new( fast => 1 );
144
145 The "await" option, when enabled, allows workers to block (semaphore-
146 like) until the number of items pending is equal to or less than a
147 threshold value. The $q->await method is described below.
148
149 On Unix platforms, "barrier" mode (enabled by default) prevents many
150 workers from dequeuing simultaneously to lessen overhead for the OS
151 kernel. Specify 0 to disable barrier mode and not allocate sockets. The
152 barrier option has no effect if constructing the queue inside a thread
153 or enabling "fast".
154
155 The "fast" option speeds up dequeues and is not enabled by default. It
156 is beneficial for queues not calling (->dequeue_nb) and not altering
157 the count value while running; e.g. ->dequeue($count).
158
159 The "gather" option is mainly for running with MCE and wanting to pass
160 item(s) to a callback function for appending to the queue. Multiple
161 queues may point to the same callback function. The callback receives
162 the queue object as the first argument and items after it.
163
164 sub _append {
165 my ($q, @items) = @_;
166 $q->enqueue(@items);
167 }
168
169 my $q7 = MCE::Queue->new( gather => \&_append );
170 my $q8 = MCE::Queue->new( gather => \&_append );
171
172 ## Items are diverted to the callback function, not the queue.
173 $q7->enqueue( 'apple', 'orange' );
174
175 Specifying the "gather" option allows one to store items temporarily
176 while ensuring output order. Although a queue object is not required,
177 this is simply a demonstration of the gather option in the context of a
178 queue.
179
180 use MCE;
181 use MCE::Queue;
182
183 sub preserve_order {
184 my %tmp; my $order_id = 1;
185
186 return sub {
187 my ($q, $chunk_id, $data) = @_;
188 $tmp{$chunk_id} = $data;
189
190 while (1) {
191 last unless exists $tmp{$order_id};
192 $q->enqueue( delete $tmp{$order_id++} );
193 }
194
195 return;
196 };
197 }
198
199 my @squares; my $q = MCE::Queue->new(
200 queue => \@squares, gather => preserve_order
201 );
202
203 my $mce = MCE->new(
204 chunk_size => 1, input_data => [ 1 .. 100 ],
205 user_func => sub {
206 $q->enqueue( MCE->chunk_id, $_ * $_ );
207 }
208 );
209
210 $mce->run;
211
212 print "@squares\n";
213
214 $q->await ( $pending_threshold )
215 The await method is beneficial when wanting to throttle worker(s)
216 appending to the queue. Perhaps, consumers are running a bit behind and
217 wanting to keep tabs on memory consumption. Below, the number of items
218 pending will never go above 20.
219
220 use Time::HiRes qw( sleep );
221
222 use MCE::Flow;
223 use MCE::Queue;
224
225 my $q = MCE::Queue->new( await => 1, fast => 1 );
226 my ( $producers, $consumers ) = ( 1, 8 );
227
228 mce_flow {
229 task_name => [ 'producer', 'consumer' ],
230 max_workers => [ $producers, $consumers ],
231 },
232 sub {
233 ## producer
234 for my $item ( 1 .. 100 ) {
235 $q->enqueue($item);
236
237 ## blocks until the # of items pending reaches <= 10
238 if ($item % 10 == 0) {
239 MCE->say( 'pending: '.$q->pending() );
240 $q->await(10);
241 }
242 }
243
244 ## notify consumers no more work
245 $q->end();
246
247 },
248 sub {
249 ## consumers
250 while (defined (my $next = $q->dequeue())) {
251 MCE->say( MCE->task_wid().': '.$next );
252 sleep 0.100;
253 }
254 };
255
256 $q->clear ( void )
257 Clears the queue of any items. This has the effect of nulling the queue
258 and the socket used for blocking.
259
260 my @a; my $q = MCE::Queue->new( queue => \@a );
261
262 @a = (); ## bad, the blocking socket may become out of sync
263 $q->clear; ## ok
264
265 $q->end ( void )
266 Stops the queue from receiving more items. Any worker blocking on
267 "dequeue" will be unblocked automatically. Subsequent calls to
268 "dequeue" will behave like "dequeue_nb". Current API available since
269 MCE 1.818.
270
271 $q->end();
272
273 MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
274 might want to enqueue "undef"'s versus calling "end". The number of
275 "undef"'s depends on how many items workers dequeue at a time.
276
277 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
278 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
279 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
280
281 $q->enqueue ( $item [, $item, ... ] )
282 Appends a list of items onto the end of the normal queue.
283
284 $q->enqueue( 'foo' );
285 $q->enqueue( 'bar', 'baz' );
286
287 $q->enqueuep ( $p, $item [, $item, ... ] )
288 Appends a list of items onto the end of the priority queue with
289 priority.
290
291 $q->enqueue( $priority, 'foo' );
292 $q->enqueue( $priority, 'bar', 'baz' );
293
294 $q->dequeue ( [ $count ] )
295 Returns the requested number of items (default 1) from the queue.
296 Priority data will always dequeue first before any data from the normal
297 queue.
298
299 $q->dequeue( 2 );
300 $q->dequeue; # default 1
301
302 The method will block if the queue contains zero items. If the queue
303 contains fewer than the requested number of items, the method will not
304 block, but return whatever items there are on the queue.
305
306 The $count, used for requesting the number of items, is beneficial when
307 workers are passing parameters through the queue. For this reason,
308 always remember to dequeue using the same multiple for the count. This
309 is unlike Thread::Queue which will block until the requested number of
310 items are available.
311
312 # MCE::Queue 1.820 and prior releases
313 while ( my @items = $q->dequeue(2) ) {
314 last unless ( defined $items[0] );
315 ...
316 }
317
318 # MCE::Queue 1.821 and later
319 while ( my @items = $q->dequeue(2) ) {
320 ...
321 }
322
323 $q->dequeue_nb ( [ $count ] )
324 Returns the requested number of items (default 1) from the queue. Like
325 with dequeue, priority data will always dequeue first. This method is
326 non-blocking and returns "undef" in the absence of data.
327
328 $q->dequeue_nb( 2 );
329 $q->dequeue_nb; # default 1
330
331 $q->insert ( $index, $item [, $item, ... ] )
332 Adds the list of items to the queue at the specified index position (0
333 is the head of the list). The head of the queue is that item which
334 would be removed by a call to dequeue.
335
336 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
337 $q->enqueue(1, 2, 3, 4);
338 $q->insert(1, 'foo', 'bar');
339 # Queue now contains: 1, foo, bar, 2, 3, 4
340
341 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
342 $q->enqueue(1, 2, 3, 4);
343 $q->insert(1, 'foo', 'bar');
344 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
345
346 $q->insertp ( $p, $index, $item [, $item, ... ] )
347 Adds the list of items to the queue at the specified index position
348 with priority. The behavior is similarly to "$q->insert" otherwise.
349
350 $q->pending ( void )
351 Returns the number of items in the queue. The count includes both
352 normal and priority data. Returns "undef" if the queue has been ended,
353 and there are no more items in the queue.
354
355 $q = MCE::Queue->new();
356 $q->enqueuep(5, 'foo', 'bar');
357 $q->enqueue('sunny', 'day');
358
359 print $q->pending(), "\n";
360 # Output: 4
361
362 $q->peek ( [ $index ] )
363 Returns an item from the normal queue, at the specified index, without
364 dequeuing anything. It defaults to the head of the queue if index is
365 not specified. The head of the queue is that item which would be
366 removed by a call to dequeue. Negative index values are supported,
367 similarly to arrays.
368
369 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
370 $q->enqueue(1, 2, 3, 4, 5);
371
372 print $q->peek(1), ' ', $q->peek(-2), "\n";
373 # Output: 2 4
374
375 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
376 $q->enqueue(1, 2, 3, 4, 5);
377
378 print $q->peek(1), ' ', $q->peek(-2), "\n";
379 # Output: 4 2
380
381 $q->peekp ( $p [, $index ] )
382 Returns an item from the queue with priority, at the specified index,
383 without dequeuing anything. It defaults to the head of the queue if
384 index is not specified. The behavior is similarly to "$q->peek"
385 otherwise.
386
387 $q->peekh ( [ $index ] )
388 Returns an item from the head of the heap or at the specified index.
389
390 $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
391 $q->enqueuep(5, 'foo');
392 $q->enqueuep(6, 'bar');
393 $q->enqueuep(4, 'sun');
394
395 print $q->peekh(0), "\n";
396 # Output: 6
397
398 $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
399 $q->enqueuep(5, 'foo');
400 $q->enqueuep(6, 'bar');
401 $q->enqueuep(4, 'sun');
402
403 print $q->peekh(0), "\n";
404 # Output: 4
405
406 $q->heap ( void )
407 Returns an array containing the heap data. Heap data consists of
408 priority numbers, not the data.
409
410 @h = $q->heap; # $MCE::Queue::HIGHEST
411 # Heap contains: 6, 5, 4
412
413 @h = $q->heap; # $MCE::Queue::LOWEST
414 # Heap contains: 4, 5, 6
415
417 · List::BinarySearch
418
419 The bsearch_num_pos method was helpful for accommodating the highest
420 and lowest order in MCE::Queue.
421
422 · POE::Queue::Array
423
424 For extra optimization, two if statements were adopted for checking
425 if the item belongs at the end or head of the queue.
426
427 · List::Priority
428
429 MCE::Queue supports both normal and priority queues.
430
431 · Thread::Queue
432
433 Thread::Queue is used as a template for identifying and documenting
434 the methods.
435
436 MCE::Queue is not fully compatible due to supporting normal and
437 priority queues simultaneously; e.g.
438
439 $q->enqueue( $item [, $item, ... ] ); # normal queue
440 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
441
442 $q->dequeue( [ $count ] ); # priority data dequeues first
443 $q->dequeue_nb( [ $count ] );
444
445 $q->pending(); # counts both normal/priority queues
446
447 · Parallel::DataPipe
448
449 The recursion example, in the synopsis above, was largely adopted
450 from this module.
451
453 MCE, MCE::Core
454
456 Mario E. Roy, <marioeroy AT gmail DOT com>
457
458
459
460perl v5.30.1 2020-02-09 MCE::Queue(3)