1MCE::Queue(3)         User Contributed Perl Documentation        MCE::Queue(3)
2
3
4

NAME

6       MCE::Queue - Hybrid (normal and priority) queues
7

VERSION

9       This document describes MCE::Queue version 1.862
10

SYNOPSIS

12        use MCE;
13        use MCE::Queue;
14
15        my $q = MCE::Queue->new;
16
17        $q->enqueue( qw/ wherefore art thou romeo / );
18
19        my $item = $q->dequeue;
20
21        if ( $q->pending ) {
22           ;
23        }
24

DESCRIPTION

26       This module provides a queue interface supporting normal and priority
27       queues and utilizing the IPC engine behind MCE. Data resides under the
28       manager process. Three options are available for overriding the default
29       value for new queues. The porder option applies to priority queues
30       only.
31
32        use MCE::Queue porder => $MCE::Queue::HIGHEST,
33                       type   => $MCE::Queue::FIFO,
34                       fast   => 0;
35
36        use MCE::Queue;                # Same as above
37
38        ## Possible values
39
40        porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
41                  $MCE::Queue::LOWEST  # Lowest priority items dequeue first
42
43        type   => $MCE::Queue::FIFO    # First in, first out
44                  $MCE::Queue::LIFO    # Last in, first out
45                  $MCE::Queue::LILO    # (Synonym for FIFO)
46                  $MCE::Queue::FILO    # (Synonym for LIFO)
47

DEMONSTRATION

49       MCE::Queue provides two run modes.
50
51       (A) The "MCE::Queue" object is constructed before running MCE. The data
52       resides under the manager process. Workers send and request data via
53       IPC.
54
55       (B) Workers might want to construct a queue for local access. In this
56       mode, the data resides under the worker process and not available to
57       other workers including the manager process.
58
59        use MCE;
60        use MCE::Queue;
61
62        my $F = MCE::Queue->new( fast => 1 );
63        my $consumers = 8;
64
65        my $mce = MCE->new(
66
67           task_end => sub {
68              my ($mce, $task_id, $task_name) = @_;
69              $F->end() if $task_name eq 'dir';
70           },
71
72           user_tasks => [{
73              max_workers => 1, task_name => 'dir',
74
75              user_func => sub {
76                 ## Create a "standalone queue" only accessible to this worker.
77                 my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
78
79                 while (defined (my $dir = $D->dequeue_nb)) {
80                    my (@files, @dirs); foreach (glob("$dir/*")) {
81                       if (-d $_) { push @dirs, $_; next; }
82                       push @files, $_;
83                    }
84                    $D->enqueue(@dirs ) if scalar @dirs;
85                    $F->enqueue(@files) if scalar @files;
86                 }
87              }
88           },{
89              max_workers => $consumers, task_name => 'file',
90
91              user_func => sub {
92                 while (defined (my $file = $F->dequeue)) {
93                    MCE->say($file);
94                 }
95              }
96           }]
97
98        )->run({ user_args => [ $ARGV[0] || '.' ] });
99
100        __END__
101
102        Results taken from files_mce.pl and files_thr.pl on the web.
103        https://github.com/marioroy/mce-examples/tree/master/other
104
105        Usage:
106           time ./files_mce.pl /usr 0 | wc -l
107           time ./files_mce.pl /usr 1 | wc -l
108           time ./files_thr.pl /usr   | wc -l
109
110        Darwin (OS)    /usr:    216,271 files
111           MCE::Queue, fast => 0 :    4.17s
112           MCE::Queue, fast => 1 :    2.62s
113           Thread::Queue         :    4.14s
114
115        Linux (VM)     /usr:    186,154 files
116           MCE::Queue, fast => 0 :   12.57s
117           MCE::Queue, fast => 1 :    3.36s
118           Thread::Queue         :    5.91s
119
120        Solaris (VM)   /usr:    603,051 files
121           MCE::Queue, fast => 0 :   39.04s
122           MCE::Queue, fast => 1 :   18.08s
123           Thread::Queue      * Perl not built to support threads
124

API DOCUMENTATION

126   MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
127       This creates a new queue. Available options are queue, porder, type,
128       await, barrier, fast, and gather.
129
130        use MCE;
131        use MCE::Queue;
132
133        my $q1 = MCE::Queue->new();
134        my $q2 = MCE::Queue->new( queue  => [ 0, 1, 2 ] );
135
136        my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
137        my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST  );
138
139        my $q5 = MCE::Queue->new( type   => $MCE::Queue::FIFO );
140        my $q6 = MCE::Queue->new( type   => $MCE::Queue::LIFO );
141
142        my $q7 = MCE::Queue->new( await  => 1, barrier => 0 );
143        my $q8 = MCE::Queue->new( fast   => 1 );
144
145       The "await" option, when enabled, allows workers to block (semaphore-
146       like) until the number of items pending is equal to or less than a
147       threshold value.  The $q->await method is described below.
148
149       On Unix platforms, "barrier" mode (enabled by default) prevents many
150       workers from dequeuing simultaneously to lessen overhead for the OS
151       kernel. Specify 0 to disable barrier mode and not allocate sockets. The
152       barrier option has no effect if constructing the queue inside a thread
153       or enabling "fast".
154
155       The "fast" option speeds up dequeues and is not enabled by default. It
156       is beneficial for queues not calling (->dequeue_nb) and not altering
157       the count value while running; e.g. ->dequeue($count).
158
159       The "gather" option is mainly for running with MCE and wanting to pass
160       item(s) to a callback function for appending to the queue. Multiple
161       queues may point to the same callback function. The callback receives
162       the queue object as the first argument and items after it.
163
164        sub _append {
165           my ($q, @items) = @_;
166           $q->enqueue(@items);
167        }
168
169        my $q7 = MCE::Queue->new( gather => \&_append );
170        my $q8 = MCE::Queue->new( gather => \&_append );
171
172        ## Items are diverted to the callback function, not the queue.
173        $q7->enqueue( 'apple', 'orange' );
174
175       Specifying the "gather" option allows one to store items temporarily
176       while ensuring output order. Although a queue object is not required,
177       this is simply a demonstration of the gather option in the context of a
178       queue.
179
180        use MCE;
181        use MCE::Queue;
182
183        sub preserve_order {
184           my %tmp; my $order_id = 1;
185
186           return sub {
187              my ($q, $chunk_id, $data) = @_;
188              $tmp{$chunk_id} = $data;
189
190              while (1) {
191                 last unless exists $tmp{$order_id};
192                 $q->enqueue( delete $tmp{$order_id++} );
193              }
194
195              return;
196           };
197        }
198
199        my @squares; my $q = MCE::Queue->new(
200           queue => \@squares, gather => preserve_order
201        );
202
203        my $mce = MCE->new(
204           chunk_size => 1, input_data => [ 1 .. 100 ],
205           user_func => sub {
206              $q->enqueue( MCE->chunk_id, $_ * $_ );
207           }
208        );
209
210        $mce->run;
211
212        print "@squares\n";
213
214   $q->await ( $pending_threshold )
215       The await method is beneficial when wanting to throttle worker(s)
216       appending to the queue. Perhaps, consumers are running a bit behind and
217       wanting to keep tabs on memory consumption. Below, the number of items
218       pending will never go above 20.
219
220        use Time::HiRes qw( sleep );
221
222        use MCE::Flow;
223        use MCE::Queue;
224
225        my $q = MCE::Queue->new( await => 1, fast => 1 );
226        my ( $producers, $consumers ) = ( 1, 8 );
227
228        mce_flow {
229           task_name   => [ 'producer', 'consumer' ],
230           max_workers => [ $producers, $consumers ],
231        },
232        sub {
233           ## producer
234           for my $item ( 1 .. 100 ) {
235              $q->enqueue($item);
236
237              ## blocks until the # of items pending reaches <= 10
238              if ($item % 10 == 0) {
239                 MCE->say( 'pending: '.$q->pending() );
240                 $q->await(10);
241              }
242           }
243
244           ## notify consumers no more work
245           $q->end();
246
247        },
248        sub {
249           ## consumers
250           while (defined (my $next = $q->dequeue())) {
251              MCE->say( MCE->task_wid().': '.$next );
252              sleep 0.100;
253           }
254        };
255
256   $q->clear ( void )
257       Clears the queue of any items. This has the effect of nulling the queue
258       and the socket used for blocking.
259
260        my @a; my $q = MCE::Queue->new( queue => \@a );
261
262        @a = ();     ## bad, the blocking socket may become out of sync
263        $q->clear;   ## ok
264
265   $q->end ( void )
266       Stops the queue from receiving more items. Any worker blocking on
267       "dequeue" will be unblocked automatically. Subsequent calls to
268       "dequeue" will behave like "dequeue_nb". Current API available since
269       MCE 1.818.
270
271        $q->end();
272
273       MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
274       might want to enqueue "undef"'s versus calling "end". The number of
275       "undef"'s depends on how many items workers dequeue at a time.
276
277        $q->enqueue((undef) x ($N_workers * 1));  # $q->dequeue()   1 item
278        $q->enqueue((undef) x ($N_workers * 2));  # $q->dequeue(2)  2 items
279        $q->enqueue((undef) x ($N_workers * N));  # $q->dequeue(N)  N items
280
281   $q->enqueue ( $item [, $item, ... ] )
282       Appends a list of items onto the end of the normal queue.
283
284        $q->enqueue( 'foo' );
285        $q->enqueue( 'bar', 'baz' );
286
287   $q->enqueuep ( $p, $item [, $item, ... ] )
288       Appends a list of items onto the end of the priority queue with
289       priority.
290
291        $q->enqueue( $priority, 'foo' );
292        $q->enqueue( $priority, 'bar', 'baz' );
293
294   $q->dequeue ( [ $count ] )
295       Returns the requested number of items (default 1) from the queue.
296       Priority data will always dequeue first before any data from the normal
297       queue.
298
299        $q->dequeue( 2 );
300        $q->dequeue; # default 1
301
302       The method will block if the queue contains zero items. If the queue
303       contains fewer than the requested number of items, the method will not
304       block, but return whatever items there are on the queue.
305
306       The $count, used for requesting the number of items, is beneficial when
307       workers are passing parameters through the queue. For this reason,
308       always remember to dequeue using the same multiple for the count. This
309       is unlike Thread::Queue which will block until the requested number of
310       items are available.
311
312        # MCE::Queue 1.820 and prior releases
313        while ( my @items = $q->dequeue(2) ) {
314           last unless ( defined $items[0] );
315           ...
316        }
317
318        # MCE::Queue 1.821 and later
319        while ( my @items = $q->dequeue(2) ) {
320           ...
321        }
322
323   $q->dequeue_nb ( [ $count ] )
324       Returns the requested number of items (default 1) from the queue. Like
325       with dequeue, priority data will always dequeue first. This method is
326       non-blocking and returns "undef" in the absence of data.
327
328        $q->dequeue_nb( 2 );
329        $q->dequeue_nb; # default 1
330
331   $q->insert ( $index, $item [, $item, ... ] )
332       Adds the list of items to the queue at the specified index position (0
333       is the head of the list). The head of the queue is that item which
334       would be removed by a call to dequeue.
335
336        $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
337        $q->enqueue(1, 2, 3, 4);
338        $q->insert(1, 'foo', 'bar');
339        # Queue now contains: 1, foo, bar, 2, 3, 4
340
341        $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
342        $q->enqueue(1, 2, 3, 4);
343        $q->insert(1, 'foo', 'bar');
344        # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
345
346   $q->insertp ( $p, $index, $item [, $item, ... ] )
347       Adds the list of items to the queue at the specified index position
348       with priority. The behavior is similarly to "$q->insert" otherwise.
349
350   $q->pending ( void )
351       Returns the number of items in the queue. The count includes both
352       normal and priority data. Returns "undef" if the queue has been ended,
353       and there are no more items in the queue.
354
355        $q = MCE::Queue->new();
356        $q->enqueuep(5, 'foo', 'bar');
357        $q->enqueue('sunny', 'day');
358
359        print $q->pending(), "\n";
360        # Output: 4
361
362   $q->peek ( [ $index ] )
363       Returns an item from the normal queue, at the specified index, without
364       dequeuing anything. It defaults to the head of the queue if index is
365       not specified. The head of the queue is that item which would be
366       removed by a call to dequeue. Negative index values are supported,
367       similarly to arrays.
368
369        $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
370        $q->enqueue(1, 2, 3, 4, 5);
371
372        print $q->peek(1), ' ', $q->peek(-2), "\n";
373        # Output: 2 4
374
375        $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
376        $q->enqueue(1, 2, 3, 4, 5);
377
378        print $q->peek(1), ' ', $q->peek(-2), "\n";
379        # Output: 4 2
380
381   $q->peekp ( $p [, $index ] )
382       Returns an item from the queue with priority, at the specified index,
383       without dequeuing anything. It defaults to the head of the queue if
384       index is not specified. The behavior is similarly to "$q->peek"
385       otherwise.
386
387   $q->peekh ( [ $index ] )
388       Returns an item from the head of the heap or at the specified index.
389
390        $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
391        $q->enqueuep(5, 'foo');
392        $q->enqueuep(6, 'bar');
393        $q->enqueuep(4, 'sun');
394
395        print $q->peekh(0), "\n";
396        # Output: 6
397
398        $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
399        $q->enqueuep(5, 'foo');
400        $q->enqueuep(6, 'bar');
401        $q->enqueuep(4, 'sun');
402
403        print $q->peekh(0), "\n";
404        # Output: 4
405
406   $q->heap ( void )
407       Returns an array containing the heap data. Heap data consists of
408       priority numbers, not the data.
409
410        @h = $q->heap;   # $MCE::Queue::HIGHEST
411        # Heap contains: 6, 5, 4
412
413        @h = $q->heap;   # $MCE::Queue::LOWEST
414        # Heap contains: 4, 5, 6
415

ACKNOWLEDGMENTS

417       ·  List::BinarySearch
418
419          The bsearch_num_pos method was helpful for accommodating the highest
420          and lowest order in MCE::Queue.
421
422       ·  POE::Queue::Array
423
424          For extra optimization, two if statements were adopted for checking
425          if the item belongs at the end or head of the queue.
426
427       ·  List::Priority
428
429          MCE::Queue supports both normal and priority queues.
430
431       ·  Thread::Queue
432
433          Thread::Queue is used as a template for identifying and documenting
434          the methods.
435
436          MCE::Queue is not fully compatible due to supporting normal and
437          priority queues simultaneously; e.g.
438
439           $q->enqueue( $item [, $item, ... ] );         # normal queue
440           $q->enqueuep( $p, $item [, $item, ... ] );    # priority queue
441
442           $q->dequeue( [ $count ] );      # priority data dequeues first
443           $q->dequeue_nb( [ $count ] );
444
445           $q->pending();                  # counts both normal/priority queues
446
447       ·  Parallel::DataPipe
448
449          The recursion example, in the synopsis above, was largely adopted
450          from this module.
451

INDEX

453       MCE, MCE::Core
454

AUTHOR

456       Mario E. Roy, <marioeroy AT gmail DOT com>
457
458
459
460perl v5.30.0                      2019-09-19                     MCE::Queue(3)
Impressum