1MCE::Queue(3) User Contributed Perl Documentation MCE::Queue(3)
2
3
4
6 MCE::Queue - Hybrid (normal and priority) queues
7
9 This document describes MCE::Queue version 1.837
10
12 use MCE;
13 use MCE::Queue;
14
15 my $q = MCE::Queue->new;
16
17 $q->enqueue( qw/ wherefore art thou romeo / );
18
19 my $item = $q->dequeue;
20
21 if ( $q->pending ) {
22 ;
23 }
24
26 This module provides a queue interface supporting normal and priority
27 queues and utilizing the IPC engine behind MCE. Data resides under the
28 manager process. Three options are available for overriding the default
29 value for new queues. The porder option applies to priority queues
30 only.
31
32 use MCE::Queue porder => $MCE::Queue::HIGHEST,
33 type => $MCE::Queue::FIFO,
34 fast => 0;
35
36 use MCE::Queue; # Same as above
37
38 ## Possible values
39
40 porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
41 $MCE::Queue::LOWEST # Lowest priority items dequeue first
42
43 type => $MCE::Queue::FIFO # First in, first out
44 $MCE::Queue::LIFO # Last in, first out
45 $MCE::Queue::LILO # (Synonym for FIFO)
46 $MCE::Queue::FILO # (Synonym for LIFO)
47
49 MCE::Queue provides two run modes.
50
51 (A) The "MCE::Queue" object is constructed before running MCE. The data
52 resides under the manager process. Workers send and request data via
53 IPC.
54
55 (B) Workers might want to construct a queue for local access. In this
56 mode, the data resides under the worker process and not available to
57 other workers including the manager process.
58
59 use MCE;
60 use MCE::Queue;
61
62 my $F = MCE::Queue->new( fast => 1 );
63 my $consumers = 8;
64
65 my $mce = MCE->new(
66
67 task_end => sub {
68 my ($mce, $task_id, $task_name) = @_;
69 $F->end() if $task_name eq 'dir';
70 },
71
72 user_tasks => [{
73 max_workers => 1, task_name => 'dir',
74
75 user_func => sub {
76 ## Create a "standalone queue" only accessible to this worker.
77 my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
78
79 while (defined (my $dir = $D->dequeue_nb)) {
80 my (@files, @dirs); foreach (glob("$dir/*")) {
81 if (-d $_) { push @dirs, $_; next; }
82 push @files, $_;
83 }
84 $D->enqueue(@dirs ) if scalar @dirs;
85 $F->enqueue(@files) if scalar @files;
86 }
87 }
88 },{
89 max_workers => $consumers, task_name => 'file',
90
91 user_func => sub {
92 while (defined (my $file = $F->dequeue)) {
93 MCE->say($file);
94 }
95 }
96 }]
97
98 )->run({ user_args => [ $ARGV[0] || '.' ] });
99
100 __END__
101
102 Results taken from files_mce.pl and files_thr.pl on the web.
103 https://github.com/marioroy/mce-examples/tree/master/other
104
105 Usage:
106 time ./files_mce.pl /usr 0 | wc -l
107 time ./files_mce.pl /usr 1 | wc -l
108 time ./files_thr.pl /usr | wc -l
109
110 Darwin (OS) /usr: 216,271 files
111 MCE::Queue, fast => 0 : 4.17s
112 MCE::Queue, fast => 1 : 2.62s
113 Thread::Queue : 4.14s
114
115 Linux (VM) /usr: 186,154 files
116 MCE::Queue, fast => 0 : 12.57s
117 MCE::Queue, fast => 1 : 3.36s
118 Thread::Queue : 5.91s
119
120 Solaris (VM) /usr: 603,051 files
121 MCE::Queue, fast => 0 : 39.04s
122 MCE::Queue, fast => 1 : 18.08s
123 Thread::Queue * Perl not built to support threads
124
126 MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
127 This creates a new queue. Available options are queue, porder, type,
128 await, fast, and gather.
129
130 use MCE;
131 use MCE::Queue;
132
133 my $q1 = MCE::Queue->new();
134 my $q2 = MCE::Queue->new( queue => [ 0, 1, 2 ] );
135
136 my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
137 my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
138
139 my $q5 = MCE::Queue->new( type => $MCE::Queue::FIFO );
140 my $q6 = MCE::Queue->new( type => $MCE::Queue::LIFO );
141
142 my $q7 = MCE::Queue->new( await => 1 );
143 my $q8 = MCE::Queue->new( fast => 1 );
144
145 The 'await' option, when enabled, allows workers to block (semaphore-
146 like) until the number of items pending is equal or less than a
147 threshold value. The $q->await method is described below.
148
149 The 'fast' option speeds up dequeues and is not enabled by default. It
150 is beneficial for queues not calling (->clear or ->dequeue_nb) and not
151 altering the optional count value while running; e.g.
152 ->dequeue($count). Basically, do not enable 'fast' if varying the count
153 dynamically.
154
155 The 'gather' option is mainly for running with MCE and wanting to pass
156 item(s) to a callback function for appending to the queue. Multiple
157 queues may point to the same callback function. The callback receives
158 the queue object as the first argument and items after it.
159
160 sub _append {
161 my ($q, @items) = @_;
162 $q->enqueue(@items);
163 }
164
165 my $q7 = MCE::Queue->new( gather => \&_append );
166 my $q8 = MCE::Queue->new( gather => \&_append );
167
168 ## Items are diverted to the callback function, not the queue.
169 $q7->enqueue( 'apple', 'orange' );
170
171 Specifying the 'gather' option allows one to store items temporarily
172 while ensuring output order. Although a queue object is not required,
173 this is simply a demonstration of the gather option in the context of a
174 queue.
175
176 use MCE;
177 use MCE::Queue;
178
179 sub preserve_order {
180 my %tmp; my $order_id = 1;
181
182 return sub {
183 my ($q, $chunk_id, $data) = @_;
184 $tmp{$chunk_id} = $data;
185
186 while (1) {
187 last unless exists $tmp{$order_id};
188 $q->enqueue( delete $tmp{$order_id++} );
189 }
190
191 return;
192 };
193 }
194
195 my @squares; my $q = MCE::Queue->new(
196 queue => \@squares, gather => preserve_order
197 );
198
199 my $mce = MCE->new(
200 chunk_size => 1, input_data => [ 1 .. 100 ],
201 user_func => sub {
202 $q->enqueue( MCE->chunk_id, $_ * $_ );
203 }
204 );
205
206 $mce->run;
207
208 print "@squares\n";
209
210 $q->await ( $pending_threshold )
211 The await method is beneficial when wanting to throttle worker(s)
212 appending to the queue. Perhaps, consumers are running a bit behind and
213 wanting to keep tabs on memory consumption. Below, the number of items
214 pending will never go above 20.
215
216 use Time::HiRes qw( sleep );
217
218 use MCE::Flow;
219 use MCE::Queue;
220
221 my $q = MCE::Queue->new( await => 1, fast => 1 );
222 my ( $producers, $consumers ) = ( 1, 8 );
223
224 mce_flow {
225 task_name => [ 'producer', 'consumer' ],
226 max_workers => [ $producers, $consumers ],
227 },
228 sub {
229 ## producer
230 for my $item ( 1 .. 100 ) {
231 $q->enqueue($item);
232
233 ## blocks until the # of items pending reaches <= 10
234 if ($item % 10 == 0) {
235 MCE->say( 'pending: '.$q->pending() );
236 $q->await(10);
237 }
238 }
239
240 ## notify consumers no more work
241 $q->end();
242
243 },
244 sub {
245 ## consumers
246 while (defined (my $next = $q->dequeue())) {
247 MCE->say( MCE->task_wid().': '.$next );
248 sleep 0.100;
249 }
250 };
251
252 $q->clear ( void )
253 Clears the queue of any items. This has the effect of nulling the queue
254 and the socket used for blocking.
255
256 my @a; my $q = MCE::Queue->new( queue => \@a );
257
258 @a = (); ## bad, the blocking socket may become out of sync
259 $q->clear; ## ok
260
261 $q->end ( void )
262 Stops the queue from receiving more items. Any worker blocking on
263 "dequeue" will be unblocked automatically. Subsequent calls to
264 "dequeue" will behave like "dequeue_nb". Current API available since
265 MCE 1.818.
266
267 $q->end();
268
269 MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
270 might want to enqueue "undef"'s versus calling "end". The number of
271 "undef"'s depends on how many items workers dequeue at a time.
272
273 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
274 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
275 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
276
277 $q->enqueue ( $item [, $item, ... ] )
278 Appends a list of items onto the end of the normal queue.
279
280 $q->enqueue( 'foo' );
281 $q->enqueue( 'bar', 'baz' );
282
283 $q->enqueuep ( $p, $item [, $item, ... ] )
284 Appends a list of items onto the end of the priority queue with
285 priority.
286
287 $q->enqueue( $priority, 'foo' );
288 $q->enqueue( $priority, 'bar', 'baz' );
289
290 $q->dequeue ( [ $count ] )
291 Returns the requested number of items (default 1) from the queue.
292 Priority data will always dequeue first before any data from the normal
293 queue.
294
295 $q->dequeue( 2 );
296 $q->dequeue; # default 1
297
298 The method will block if the queue contains zero items. If the queue
299 contains fewer than the requested number of items, the method will not
300 block, but return whatever items there are on the queue.
301
302 The $count, used for requesting the number of items, is beneficial when
303 workers are passing parameters through the queue. For this reason,
304 always remember to dequeue using the same multiple for the count. This
305 is unlike Thread::Queue which will block until the requested number of
306 items are available.
307
308 # MCE::Queue 1.820 and prior releases
309 while ( my @items = $q->dequeue(2) ) {
310 last unless ( defined $items[0] );
311 ...
312 }
313
314 # MCE::Queue 1.821 and later
315 while ( my @items = $q->dequeue(2) ) {
316 ...
317 }
318
319 $q->dequeue_nb ( [ $count ] )
320 Returns the requested number of items (default 1) from the queue. Like
321 with dequeue, priority data will always dequeue first. This method is
322 non-blocking and returns "undef" in the absence of data.
323
324 $q->dequeue_nb( 2 );
325 $q->dequeue_nb; # default 1
326
327 $q->insert ( $index, $item [, $item, ... ] )
328 Adds the list of items to the queue at the specified index position (0
329 is the head of the list). The head of the queue is that item which
330 would be removed by a call to dequeue.
331
332 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
333 $q->enqueue(1, 2, 3, 4);
334 $q->insert(1, 'foo', 'bar');
335 # Queue now contains: 1, foo, bar, 2, 3, 4
336
337 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
338 $q->enqueue(1, 2, 3, 4);
339 $q->insert(1, 'foo', 'bar');
340 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
341
342 $q->insertp ( $p, $index, $item [, $item, ... ] )
343 Adds the list of items to the queue at the specified index position
344 with priority. The behavior is similarly to "$q-"insert> otherwise.
345
346 $q->pending ( void )
347 Returns the number of items in the queue. The count includes both
348 normal and priority data. Returns "undef" if the queue has been ended,
349 and there are no more items in the queue.
350
351 $q = MCE::Queue->new();
352 $q->enqueuep(5, 'foo', 'bar');
353 $q->enqueue('sunny', 'day');
354
355 print $q->pending(), "\n";
356 # Output: 4
357
358 $q->peek ( [ $index ] )
359 Returns an item from the normal queue, at the specified index, without
360 dequeuing anything. It defaults to the head of the queue if index is
361 not specified. The head of the queue is that item which would be
362 removed by a call to dequeue. Negative index values are supported,
363 similarly to arrays.
364
365 $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
366 $q->enqueue(1, 2, 3, 4, 5);
367
368 print $q->peek(1), ' ', $q->peek(-2), "\n";
369 # Output: 2 4
370
371 $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
372 $q->enqueue(1, 2, 3, 4, 5);
373
374 print $q->peek(1), ' ', $q->peek(-2), "\n";
375 # Output: 4 2
376
377 $q->peekp ( $p [, $index ] )
378 Returns an item from the queue with priority, at the specified index,
379 without dequeuing anything. It defaults to the head of the queue if
380 index is not specified. The behavior is similarly to "$q-"peek>
381 otherwise.
382
383 $q->peekh ( [ $index ] )
384 Returns an item from the head of the heap or at the specified index.
385
386 $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
387 $q->enqueuep(5, 'foo');
388 $q->enqueuep(6, 'bar');
389 $q->enqueuep(4, 'sun');
390
391 print $q->peekh(0), "\n";
392 # Output: 6
393
394 $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
395 $q->enqueuep(5, 'foo');
396 $q->enqueuep(6, 'bar');
397 $q->enqueuep(4, 'sun');
398
399 print $q->peekh(0), "\n";
400 # Output: 4
401
402 $q->heap ( void )
403 Returns an array containing the heap data. Heap data consists of
404 priority numbers, not the data.
405
406 @h = $q->heap; # $MCE::Queue::HIGHEST
407 # Heap contains: 6, 5, 4
408
409 @h = $q->heap; # $MCE::Queue::LOWEST
410 # Heap contains: 4, 5, 6
411
413 List::BinarySearch
414 The bsearch_num_pos method was helpful for accommodating the highest
415 and lowest order in MCE::Queue.
416
417 POE::Queue::Array
418 For extra optimization, two if statements were adopted for checking
419 if the item belongs at the end or head of the queue.
420
421 List::Priority
422 MCE::Queue supports both normal and priority queues.
423
424 Thread::Queue
425 Thread::Queue is used as a template for identifying and documenting
426 the methods.
427
428 MCE::Queue is not fully compatible due to supporting normal and
429 priority queues simultaneously; e.g.
430
431 $q->enqueue( $item [, $item, ... ] ); # normal queue
432 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
433
434 $q->dequeue( [ $count ] ); # priority data dequeues first
435 $q->dequeue_nb( [ $count ] );
436
437 $q->pending(); # counts both normal/priority queues
438
439 Parallel::DataPipe
440 The recursion example, in the synopsis above, was largely adopted
441 from this module.
442
444 MCE, MCE::Core
445
447 Mario E. Roy, <marioeroy AT gmail DOT com>
448
449
450
451perl v5.28.0 2018-08-25 MCE::Queue(3)