1MCE::Queue(3)         User Contributed Perl Documentation        MCE::Queue(3)
2
3
4

NAME

6       MCE::Queue - Hybrid (normal and priority) queues
7

VERSION

9       This document describes MCE::Queue version 1.837
10

SYNOPSIS

12        use MCE;
13        use MCE::Queue;
14
15        my $q = MCE::Queue->new;
16
17        $q->enqueue( qw/ wherefore art thou romeo / );
18
19        my $item = $q->dequeue;
20
21        if ( $q->pending ) {
22           ;
23        }
24

DESCRIPTION

26       This module provides a queue interface supporting normal and priority
27       queues and utilizing the IPC engine behind MCE. Data resides under the
28       manager process. Three options are available for overriding the default
29       value for new queues. The porder option applies to priority queues
30       only.
31
32        use MCE::Queue porder => $MCE::Queue::HIGHEST,
33                       type   => $MCE::Queue::FIFO,
34                       fast   => 0;
35
36        use MCE::Queue;                # Same as above
37
38        ## Possible values
39
40        porder => $MCE::Queue::HIGHEST # Highest priority items dequeue first
41                  $MCE::Queue::LOWEST  # Lowest priority items dequeue first
42
43        type   => $MCE::Queue::FIFO    # First in, first out
44                  $MCE::Queue::LIFO    # Last in, first out
45                  $MCE::Queue::LILO    # (Synonym for FIFO)
46                  $MCE::Queue::FILO    # (Synonym for LIFO)
47

DEMONSTRATION

49       MCE::Queue provides two run modes.
50
51       (A) The "MCE::Queue" object is constructed before running MCE. The data
52       resides under the manager process. Workers send and request data via
53       IPC.
54
55       (B) Workers might want to construct a queue for local access. In this
56       mode, the data resides under the worker process and not available to
57       other workers including the manager process.
58
59        use MCE;
60        use MCE::Queue;
61
62        my $F = MCE::Queue->new( fast => 1 );
63        my $consumers = 8;
64
65        my $mce = MCE->new(
66
67           task_end => sub {
68              my ($mce, $task_id, $task_name) = @_;
69              $F->end() if $task_name eq 'dir';
70           },
71
72           user_tasks => [{
73              max_workers => 1, task_name => 'dir',
74
75              user_func => sub {
76                 ## Create a "standalone queue" only accessible to this worker.
77                 my $D = MCE::Queue->new(queue => [ MCE->user_args->[0] ]);
78
79                 while (defined (my $dir = $D->dequeue_nb)) {
80                    my (@files, @dirs); foreach (glob("$dir/*")) {
81                       if (-d $_) { push @dirs, $_; next; }
82                       push @files, $_;
83                    }
84                    $D->enqueue(@dirs ) if scalar @dirs;
85                    $F->enqueue(@files) if scalar @files;
86                 }
87              }
88           },{
89              max_workers => $consumers, task_name => 'file',
90
91              user_func => sub {
92                 while (defined (my $file = $F->dequeue)) {
93                    MCE->say($file);
94                 }
95              }
96           }]
97
98        )->run({ user_args => [ $ARGV[0] || '.' ] });
99
100        __END__
101
102        Results taken from files_mce.pl and files_thr.pl on the web.
103        https://github.com/marioroy/mce-examples/tree/master/other
104
105        Usage:
106           time ./files_mce.pl /usr 0 | wc -l
107           time ./files_mce.pl /usr 1 | wc -l
108           time ./files_thr.pl /usr   | wc -l
109
110        Darwin (OS)    /usr:    216,271 files
111           MCE::Queue, fast => 0 :    4.17s
112           MCE::Queue, fast => 1 :    2.62s
113           Thread::Queue         :    4.14s
114
115        Linux (VM)     /usr:    186,154 files
116           MCE::Queue, fast => 0 :   12.57s
117           MCE::Queue, fast => 1 :    3.36s
118           Thread::Queue         :    5.91s
119
120        Solaris (VM)   /usr:    603,051 files
121           MCE::Queue, fast => 0 :   39.04s
122           MCE::Queue, fast => 1 :   18.08s
123           Thread::Queue      * Perl not built to support threads
124

API DOCUMENTATION

126   MCE::Queue->new ( [ queue => \@array, await => 1, fast => 1 ] )
127       This creates a new queue. Available options are queue, porder, type,
128       await, fast, and gather.
129
130        use MCE;
131        use MCE::Queue;
132
133        my $q1 = MCE::Queue->new();
134        my $q2 = MCE::Queue->new( queue  => [ 0, 1, 2 ] );
135
136        my $q3 = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
137        my $q4 = MCE::Queue->new( porder => $MCE::Queue::LOWEST  );
138
139        my $q5 = MCE::Queue->new( type   => $MCE::Queue::FIFO );
140        my $q6 = MCE::Queue->new( type   => $MCE::Queue::LIFO );
141
142        my $q7 = MCE::Queue->new( await  => 1 );
143        my $q8 = MCE::Queue->new( fast   => 1 );
144
145       The 'await' option, when enabled, allows workers to block (semaphore-
146       like) until the number of items pending is equal or less than a
147       threshold value.  The $q->await method is described below.
148
149       The 'fast' option speeds up dequeues and is not enabled by default. It
150       is beneficial for queues not calling (->clear or ->dequeue_nb) and not
151       altering the optional count value while running; e.g.
152       ->dequeue($count). Basically, do not enable 'fast' if varying the count
153       dynamically.
154
155       The 'gather' option is mainly for running with MCE and wanting to pass
156       item(s) to a callback function for appending to the queue. Multiple
157       queues may point to the same callback function. The callback receives
158       the queue object as the first argument and items after it.
159
160        sub _append {
161           my ($q, @items) = @_;
162           $q->enqueue(@items);
163        }
164
165        my $q7 = MCE::Queue->new( gather => \&_append );
166        my $q8 = MCE::Queue->new( gather => \&_append );
167
168        ## Items are diverted to the callback function, not the queue.
169        $q7->enqueue( 'apple', 'orange' );
170
171       Specifying the 'gather' option allows one to store items temporarily
172       while ensuring output order. Although a queue object is not required,
173       this is simply a demonstration of the gather option in the context of a
174       queue.
175
176        use MCE;
177        use MCE::Queue;
178
179        sub preserve_order {
180           my %tmp; my $order_id = 1;
181
182           return sub {
183              my ($q, $chunk_id, $data) = @_;
184              $tmp{$chunk_id} = $data;
185
186              while (1) {
187                 last unless exists $tmp{$order_id};
188                 $q->enqueue( delete $tmp{$order_id++} );
189              }
190
191              return;
192           };
193        }
194
195        my @squares; my $q = MCE::Queue->new(
196           queue => \@squares, gather => preserve_order
197        );
198
199        my $mce = MCE->new(
200           chunk_size => 1, input_data => [ 1 .. 100 ],
201           user_func => sub {
202              $q->enqueue( MCE->chunk_id, $_ * $_ );
203           }
204        );
205
206        $mce->run;
207
208        print "@squares\n";
209
210   $q->await ( $pending_threshold )
211       The await method is beneficial when wanting to throttle worker(s)
212       appending to the queue. Perhaps, consumers are running a bit behind and
213       wanting to keep tabs on memory consumption. Below, the number of items
214       pending will never go above 20.
215
216        use Time::HiRes qw( sleep );
217
218        use MCE::Flow;
219        use MCE::Queue;
220
221        my $q = MCE::Queue->new( await => 1, fast => 1 );
222        my ( $producers, $consumers ) = ( 1, 8 );
223
224        mce_flow {
225           task_name   => [ 'producer', 'consumer' ],
226           max_workers => [ $producers, $consumers ],
227        },
228        sub {
229           ## producer
230           for my $item ( 1 .. 100 ) {
231              $q->enqueue($item);
232
233              ## blocks until the # of items pending reaches <= 10
234              if ($item % 10 == 0) {
235                 MCE->say( 'pending: '.$q->pending() );
236                 $q->await(10);
237              }
238           }
239
240           ## notify consumers no more work
241           $q->end();
242
243        },
244        sub {
245           ## consumers
246           while (defined (my $next = $q->dequeue())) {
247              MCE->say( MCE->task_wid().': '.$next );
248              sleep 0.100;
249           }
250        };
251
252   $q->clear ( void )
253       Clears the queue of any items. This has the effect of nulling the queue
254       and the socket used for blocking.
255
256        my @a; my $q = MCE::Queue->new( queue => \@a );
257
258        @a = ();     ## bad, the blocking socket may become out of sync
259        $q->clear;   ## ok
260
261   $q->end ( void )
262       Stops the queue from receiving more items. Any worker blocking on
263       "dequeue" will be unblocked automatically. Subsequent calls to
264       "dequeue" will behave like "dequeue_nb". Current API available since
265       MCE 1.818.
266
267        $q->end();
268
269       MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
270       might want to enqueue "undef"'s versus calling "end". The number of
271       "undef"'s depends on how many items workers dequeue at a time.
272
273        $q->enqueue((undef) x ($N_workers * 1));  # $q->dequeue()   1 item
274        $q->enqueue((undef) x ($N_workers * 2));  # $q->dequeue(2)  2 items
275        $q->enqueue((undef) x ($N_workers * N));  # $q->dequeue(N)  N items
276
277   $q->enqueue ( $item [, $item, ... ] )
278       Appends a list of items onto the end of the normal queue.
279
280        $q->enqueue( 'foo' );
281        $q->enqueue( 'bar', 'baz' );
282
283   $q->enqueuep ( $p, $item [, $item, ... ] )
284       Appends a list of items onto the end of the priority queue with
285       priority.
286
287        $q->enqueue( $priority, 'foo' );
288        $q->enqueue( $priority, 'bar', 'baz' );
289
290   $q->dequeue ( [ $count ] )
291       Returns the requested number of items (default 1) from the queue.
292       Priority data will always dequeue first before any data from the normal
293       queue.
294
295        $q->dequeue( 2 );
296        $q->dequeue; # default 1
297
298       The method will block if the queue contains zero items. If the queue
299       contains fewer than the requested number of items, the method will not
300       block, but return whatever items there are on the queue.
301
302       The $count, used for requesting the number of items, is beneficial when
303       workers are passing parameters through the queue. For this reason,
304       always remember to dequeue using the same multiple for the count. This
305       is unlike Thread::Queue which will block until the requested number of
306       items are available.
307
308        # MCE::Queue 1.820 and prior releases
309        while ( my @items = $q->dequeue(2) ) {
310           last unless ( defined $items[0] );
311           ...
312        }
313
314        # MCE::Queue 1.821 and later
315        while ( my @items = $q->dequeue(2) ) {
316           ...
317        }
318
319   $q->dequeue_nb ( [ $count ] )
320       Returns the requested number of items (default 1) from the queue. Like
321       with dequeue, priority data will always dequeue first. This method is
322       non-blocking and returns "undef" in the absence of data.
323
324        $q->dequeue_nb( 2 );
325        $q->dequeue_nb; # default 1
326
327   $q->insert ( $index, $item [, $item, ... ] )
328       Adds the list of items to the queue at the specified index position (0
329       is the head of the list). The head of the queue is that item which
330       would be removed by a call to dequeue.
331
332        $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
333        $q->enqueue(1, 2, 3, 4);
334        $q->insert(1, 'foo', 'bar');
335        # Queue now contains: 1, foo, bar, 2, 3, 4
336
337        $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
338        $q->enqueue(1, 2, 3, 4);
339        $q->insert(1, 'foo', 'bar');
340        # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
341
342   $q->insertp ( $p, $index, $item [, $item, ... ] )
343       Adds the list of items to the queue at the specified index position
344       with priority. The behavior is similarly to "$q-"insert> otherwise.
345
346   $q->pending ( void )
347       Returns the number of items in the queue. The count includes both
348       normal and priority data. Returns "undef" if the queue has been ended,
349       and there are no more items in the queue.
350
351        $q = MCE::Queue->new();
352        $q->enqueuep(5, 'foo', 'bar');
353        $q->enqueue('sunny', 'day');
354
355        print $q->pending(), "\n";
356        # Output: 4
357
358   $q->peek ( [ $index ] )
359       Returns an item from the normal queue, at the specified index, without
360       dequeuing anything. It defaults to the head of the queue if index is
361       not specified. The head of the queue is that item which would be
362       removed by a call to dequeue. Negative index values are supported,
363       similarly to arrays.
364
365        $q = MCE::Queue->new( type => $MCE::Queue::FIFO );
366        $q->enqueue(1, 2, 3, 4, 5);
367
368        print $q->peek(1), ' ', $q->peek(-2), "\n";
369        # Output: 2 4
370
371        $q = MCE::Queue->new( type => $MCE::Queue::LIFO );
372        $q->enqueue(1, 2, 3, 4, 5);
373
374        print $q->peek(1), ' ', $q->peek(-2), "\n";
375        # Output: 4 2
376
377   $q->peekp ( $p [, $index ] )
378       Returns an item from the queue with priority, at the specified index,
379       without dequeuing anything. It defaults to the head of the queue if
380       index is not specified. The behavior is similarly to "$q-"peek>
381       otherwise.
382
383   $q->peekh ( [ $index ] )
384       Returns an item from the head of the heap or at the specified index.
385
386        $q = MCE::Queue->new( porder => $MCE::Queue::HIGHEST );
387        $q->enqueuep(5, 'foo');
388        $q->enqueuep(6, 'bar');
389        $q->enqueuep(4, 'sun');
390
391        print $q->peekh(0), "\n";
392        # Output: 6
393
394        $q = MCE::Queue->new( porder => $MCE::Queue::LOWEST );
395        $q->enqueuep(5, 'foo');
396        $q->enqueuep(6, 'bar');
397        $q->enqueuep(4, 'sun');
398
399        print $q->peekh(0), "\n";
400        # Output: 4
401
402   $q->heap ( void )
403       Returns an array containing the heap data. Heap data consists of
404       priority numbers, not the data.
405
406        @h = $q->heap;   # $MCE::Queue::HIGHEST
407        # Heap contains: 6, 5, 4
408
409        @h = $q->heap;   # $MCE::Queue::LOWEST
410        # Heap contains: 4, 5, 6
411

ACKNOWLEDGMENTS

413       List::BinarySearch
414          The bsearch_num_pos method was helpful for accommodating the highest
415          and lowest order in MCE::Queue.
416
417       POE::Queue::Array
418          For extra optimization, two if statements were adopted for checking
419          if the item belongs at the end or head of the queue.
420
421       List::Priority
422          MCE::Queue supports both normal and priority queues.
423
424       Thread::Queue
425          Thread::Queue is used as a template for identifying and documenting
426          the methods.
427
428          MCE::Queue is not fully compatible due to supporting normal and
429          priority queues simultaneously; e.g.
430
431           $q->enqueue( $item [, $item, ... ] );         # normal queue
432           $q->enqueuep( $p, $item [, $item, ... ] );    # priority queue
433
434           $q->dequeue( [ $count ] );      # priority data dequeues first
435           $q->dequeue_nb( [ $count ] );
436
437           $q->pending();                  # counts both normal/priority queues
438
439       Parallel::DataPipe
440          The recursion example, in the synopsis above, was largely adopted
441          from this module.
442

INDEX

444       MCE, MCE::Core
445

AUTHOR

447       Mario E. Roy, <marioeroy AT gmail DOT com>
448
449
450
451perl v5.28.0                      2018-08-25                     MCE::Queue(3)
Impressum