1MCE::Queue(3) User Contributed Perl Documentation MCE::Queue(3)
2
3
4
6 MCE::Queue - Hybrid (normal and priority) queues
7
9 This document describes MCE::Queue version 1.878
10
12 use MCE;
13 use MCE::Queue;
14
15 my $q = MCE::Queue->new;
16
17 $q->enqueue( qw/ wherefore art thou romeo / );
18
19 my $item = $q->dequeue;
20
21 if ( $q->pending ) {
22 ;
23 }
24
26 This module provides a queue interface supporting normal and priority
27 queues and utilizing the IPC engine behind MCE. Data resides under the
28 manager process. Three options are available for overriding the default
29 value for new queues. The porder option applies to priority queues
30 only.
31
32 use MCE::Queue porder => $MCE::Queue::HIGHEST,
33 type => $MCE::Queue::FIFO;
34
35 use MCE::Queue; # Same as above
36
37 ## Possible values
38
39 porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
40 $MCE::Queue::LOWEST # Lowest priority items dequeue first
41
42 type => $MCE::Queue::FIFO # First in, first out
43 $MCE::Queue::LIFO # Last in, first out
44 $MCE::Queue::LILO # (Synonym for FIFO)
45 $MCE::Queue::FILO # (Synonym for LIFO)
46
48 MCE::Queue provides two run modes.
49
50 (A) The "MCE::Queue" object is constructed before running MCE. The data
51 resides under the manager process. Workers send and request data via
52 IPC.
53
54 (B) Workers might want to construct a queue for local access. In this
55 mode, the data resides under the worker process and not available to
56 other workers including the manager process.
57
58 use MCE;
59 use MCE::Queue;
60
61 my $F = MCE::Queue->new( fast => 1 );
62 my $consumers = 8;
63
64 my $mce = MCE->new(
65
66 task_end => sub {
67 my ($mce, $task_id, $task_name) = @_;
68 $F->end() if $task_name eq 'dir';
69 },
70
71 user_tasks => [{
72 max_workers => 1, task_name => 'dir',
73
74 user_func => sub {
75 ## Create a "standalone queue" only accessible to this worker.
76 my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
77
78 while (defined (my $dir = $D->dequeue_nb)) {
79 my (@files, @dirs); foreach (glob("$dir/*")) {
80 if (-d $_) { push @dirs, $_; next; }
81 push @files, $_;
82 }
83 $D->enqueue(@dirs ) if scalar @dirs;
84 $F->enqueue(@files) if scalar @files;
85 }
86 }
87 },{
88 max_workers => $consumers, task_name => 'file',
89
90 user_func => sub {
91 while (defined (my $file = $F->dequeue)) {
92 MCE->say($file);
93 }
94 }
95 }]
96
97 )->run({ user_args => [ $ARGV[0] || '.' ] });
98
99 __END__
100
101 Results taken from files_mce.pl and files_thr.pl on the web.
102 https://github.com/marioroy/mce-examples/tree/master/other
103
104 Usage:
105 time ./files_mce.pl /usr 0 | wc -l
106 time ./files_mce.pl /usr 1 | wc -l
107 time ./files_thr.pl /usr | wc -l
108
109 Darwin (OS) /usr: 216,271 files
110 MCE::Queue, fast => 0 : 4.17s
111 MCE::Queue, fast => 1 : 2.62s
112 Thread::Queue : 4.14s
113
114 Linux (VM) /usr: 186,154 files
115 MCE::Queue, fast => 0 : 12.57s
116 MCE::Queue, fast => 1 : 3.36s
117 Thread::Queue : 5.91s
118
119 Solaris (VM) /usr: 603,051 files
120 MCE::Queue, fast => 0 : 39.04s
121 MCE::Queue, fast => 1 : 18.08s
122 Thread::Queue * Perl not built to support threads
123
125 MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
126 This creates a new queue. Available options are queue, porder, type,
127 await, and gather. Note: The barrier and fast options are silentently
128 ignored (no-op) if specified; starting with 1.867.
129
130 use MCE;
131 use MCE::Queue;
132
133 my $q1 = MCE::Queue->new();
134 my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );
135
136 my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
137 my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
138
139 my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
140 my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );
141
142 my $q7 = MCE::Queue->new( await => 1, barrier => 0 );
143 my $q8 = MCE::Queue->new( fast => 1 );
144
145 The "await" option, when enabled, allows workers to block (semaphore-
146 like) until the number of items pending is equal to or less than a
147 threshold value. The $q->await method is described below.
148
149 Obsolete: On Unix platforms, "barrier" mode (enabled by default)
150 prevents many workers from dequeuing simultaneously to lessen overhead
151 for the OS kernel. Specify 0 to disable barrier mode and not allocate
152 sockets. The barrier option has no effect if constructing the queue
153 inside a thread or enabling "fast".
154
155 Obsolete: The "fast" option speeds up dequeues and is not enabled by
156 default. It is beneficial for queues not calling (->dequeue_nb) and
157 not altering the count value while running; e.g. ->dequeue($count).
158
159 The "gather" option is mainly for running with MCE and wanting to pass
160 item(s) to a callback function for appending to the queue. Multiple
161 queues may point to the same callback function. The callback receives
162 the queue object as the first argument and items after it.
163
164 sub _append {
165 my ($q, @items) = @_;
166 $q->enqueue(@items);
167 }
168
169 my $q7 = MCE::Queue->new( gather => \&_append );
170 my $q8 = MCE::Queue->new( gather => \&_append );
171
172 ## Items are diverted to the callback function, not the queue.
173 $q7->enqueue( 'apple', 'orange' );
174
175 Specifying the "gather" option allows one to store items temporarily
176 while ensuring output order. Although a queue object is not required,
177 this is simply a demonstration of the gather option in the context of a
178 queue.
179
180 use MCE;
181 use MCE::Queue;
182
183 sub preserve_order {
184 my %tmp; my $order_id = 1;
185
186 return sub {
187 my ($q, $chunk_id, $data) = @_;
188 $tmp{$chunk_id} = $data;
189
190 while (1) {
191 last unless exists $tmp{$order_id};
192 $q->enqueue( delete $tmp{$order_id++} );
193 }
194
195 return;
196 };
197 }
198
199 my @squares; my $q = MCE::Queue->new(
200 queue => \@squares, gather => preserve_order
201 );
202
203 my $mce = MCE->new(
204 chunk_size => 1, input_data => [ 1 .. 100 ],
205 user_func => sub {
206 $q->enqueue( MCE->chunk_id, $_ * $_ );
207 }
208 );
209
210 $mce->run;
211
212 print "@squares\n";
213
214 $q->await ( $pending_threshold )
215 The await method is beneficial when wanting to throttle worker(s)
216 appending to the queue. Perhaps, consumers are running a bit behind and
217 wanting to keep tabs on memory consumption. Below, the number of items
218 pending will never go above 20.
219
220 use Time::HiRes qw( sleep );
221
222 use MCE::Flow;
223 use MCE::Queue;
224
225 my $q = MCE::Queue->new( await => 1, fast => 1 );
226 my ( $producers, $consumers ) = ( 1, 8 );
227
228 mce_flow {
229 task_name => [ 'producer', 'consumer' ],
230 max_workers => [ $producers, $consumers ],
231 },
232 sub {
233 ## producer
234 for my $item ( 1 .. 100 ) {
235 $q->enqueue($item);
236
237 ## blocks until the # of items pending reaches <= 10
238 if ($item % 10 == 0) {
239 MCE->say( 'pending: '.$q->pending() );
240 $q->await(10);
241 }
242 }
243
244 ## notify consumers no more work
245 $q->end();
246
247 },
248 sub {
249 ## consumers
250 while (defined (my $next = $q->dequeue())) {
251 MCE->say( MCE->task_wid().': '.$next );
252 sleep 0.100;
253 }
254 };
255
256 $q->clear ( void )
257 Clears the queue of any items. This has the effect of nulling the queue
258 and the socket used for blocking.
259
260 my @a; my $q = MCE::Queue->new( queue => \@a );
261
262 @a = (); ## bad, the blocking socket may become out of sync
263 $q->clear; ## ok
264
265 $q->end ( void )
266 Stops the queue from receiving more items. Any worker blocking on
267 "dequeue" will be unblocked automatically. Subsequent calls to
268 "dequeue" will behave like "dequeue_nb". Current API available since
269 MCE 1.818.
270
271 $q->end();
272
273 MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
274 might want to enqueue "undef"'s versus calling "end". The number of
275 "undef"'s depends on how many items workers dequeue at a time.
276
277 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
278 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
279 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
280
281 $q->enqueue ( $item [, $item, ... ] )
282 Appends a list of items onto the end of the normal queue.
283
284 $q->enqueue( 'foo' );
285 $q->enqueue( 'bar', 'baz' );
286
287 $q->enqueuep ( $p, $item [, $item, ... ] )
288 Appends a list of items onto the end of the priority queue with
289 priority.
290
291 $q->enqueue( $priority, 'foo' );
292 $q->enqueue( $priority, 'bar', 'baz' );
293
294 $q->dequeue ( [ $count ] )
295 Returns the requested number of items (default 1) from the queue.
296 Priority data will always dequeue first before any data from the normal
297 queue.
298
299 $q->dequeue( 2 );
300 $q->dequeue; # default 1
301
302 The method will block if the queue contains zero items. If the queue
303 contains fewer than the requested number of items, the method will not
304 block, but return whatever items there are on the queue.
305
306 The $count, used for requesting the number of items, is beneficial when
307 workers are passing parameters through the queue. For this reason,
308 always remember to dequeue using the same multiple for the count. This
309 is unlike Thread::Queue which will block until the requested number of
310 items are available.
311
312 # MCE::Queue 1.820 and prior releases
313 while ( my @items = $q->dequeue(2) ) {
314 last unless ( defined $items[0] );
315 ...
316 }
317
318 # MCE::Queue 1.821 and later
319 while ( my @items = $q->dequeue(2) ) {
320 ...
321 }
322
323 $q->dequeue_nb ( [ $count ] )
324 Returns the requested number of items (default 1) from the queue. Like
325 with dequeue, priority data will always dequeue first. This method is
326 non-blocking and returns "undef" in the absence of data.
327
328 $q->dequeue_nb( 2 );
329 $q->dequeue_nb; # default 1
330
331 $q->insert ( $index, $item [, $item, ... ] )
332 Adds the list of items to the queue at the specified index position (0
333 is the head of the list). The head of the queue is that item which
334 would be removed by a call to dequeue.
335
336 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
337 $q->enqueue(1, 2, 3, 4);
338 $q->insert(1, 'foo', 'bar');
339 # Queue now contains: 1, foo, bar, 2, 3, 4
340
341 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
342 $q->enqueue(1, 2, 3, 4);
343 $q->insert(1, 'foo', 'bar');
344 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
345
346 $q->insertp ( $p, $index, $item [, $item, ... ] )
347 Adds the list of items to the queue at the specified index position
348 with priority. The behavior is similarly to "$q->insert" otherwise.
349
350 $q->pending ( void )
351 Returns the number of items in the queue. The count includes both
352 normal and priority data. Returns "undef" if the queue has been ended,
353 and there are no more items in the queue.
354
355 $q = MCE::Queue->new();
356 $q->enqueuep(5, 'foo', 'bar');
357 $q->enqueue('sunny', 'day');
358
359 print $q->pending(), "\n";
360 # Output: 4
361
362 $q->peek ( [ $index ] )
363 Returns an item from the normal queue, at the specified index, without
364 dequeuing anything. It defaults to the head of the queue if index is
365 not specified. The head of the queue is that item which would be
366 removed by a call to dequeue. Negative index values are supported,
367 similarly to arrays.
368
369 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
370 $q->enqueue(1, 2, 3, 4, 5);
371
372 print $q->peek(1), ' ', $q->peek(-2), "\n";
373 # Output: 2 4
374
375 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
376 $q->enqueue(1, 2, 3, 4, 5);
377
378 print $q->peek(1), ' ', $q->peek(-2), "\n";
379 # Output: 4 2
380
381 $q->peekp ( $p [, $index ] )
382 Returns an item from the queue with priority, at the specified index,
383 without dequeuing anything. It defaults to the head of the queue if
384 index is not specified. The behavior is similarly to "$q->peek"
385 otherwise.
386
387 $q->peekh ( [ $index ] )
388 Returns an item from the head of the heap or at the specified index.
389
390 $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
391 $q->enqueuep(5, 'foo');
392 $q->enqueuep(6, 'bar');
393 $q->enqueuep(4, 'sun');
394
395 print $q->peekh(0), "\n";
396 # Output: 6
397
398 $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
399 $q->enqueuep(5, 'foo');
400 $q->enqueuep(6, 'bar');
401 $q->enqueuep(4, 'sun');
402
403 print $q->peekh(0), "\n";
404 # Output: 4
405
406 $q->heap ( void )
407 Returns an array containing the heap data. Heap data consists of
408 priority numbers, not the data.
409
410 @h = $q->heap; # $MCE::Queue::HIGHEST
411 # Heap contains: 6, 5, 4
412
413 @h = $q->heap; # $MCE::Queue::LOWEST
414 # Heap contains: 4, 5, 6
415
417 • List::BinarySearch
418
419 The bsearch_num_pos method was helpful for accommodating the highest
420 and lowest order in MCE::Queue.
421
422 • POE::Queue::Array
423
424 For extra optimization, two if statements were adopted for checking
425 if the item belongs at the end or head of the queue.
426
427 • List::Priority
428
429 MCE::Queue supports both normal and priority queues.
430
431 • Thread::Queue
432
433 Thread::Queue is used as a template for identifying and documenting
434 the methods.
435
436 MCE::Queue is not fully compatible due to supporting normal and
437 priority queues simultaneously; e.g.
438
439 $q->enqueue( $item [, $item, ... ] ); # normal queue
440 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
441
442 $q->dequeue( [ $count ] ); # priority data dequeues first
443 $q->dequeue_nb( [ $count ] );
444
445 $q->pending(); # counts both normal/priority queues
446
447 • Parallel::DataPipe
448
449 The recursion example, in the synopsis above, was largely adopted
450 from this module.
451
453 MCE, MCE::Core
454
456 Mario E. Roy, <marioeroy AT gmail DOT com>
457
458
459
460perl v5.34.0 2022-02-20 MCE::Queue(3)