1MCE::Shared::Queue(3) User Contributed Perl DocumentationMCE::Shared::Queue(3)
2
3
4

NAME

6       MCE::Shared::Queue - Hybrid-queue helper class
7

VERSION

9       This document describes MCE::Shared::Queue version 1.840
10

DESCRIPTION

12       A queue helper class for use as a standalone or managed by MCE::Shared.
13
14       This module is mostly compatible with MCE::Queue except for the
15       "gather" option which is not supported in this context. It provides a
16       queue interface supporting normal and priority queues. Data from shared
17       queues reside under the shared-manager process, otherwise locally.
18

SYNOPSIS

20        # non-shared or local construction for use by a single process
21
22        use MCE::Shared::Queue;
23
24        my $qu = MCE::Shared::Queue->new(
25           await => 1, fast => 0, queue => [ "." ]
26        );
27
28        # construction for sharing with other threads and processes
29
30        use MCE::Shared;
31        use MCE::Shared::Queue;
32
33        my $qu = MCE::Shared->queue(
34           porder => $MCE::Shared::Queue::HIGHEST,
35           type   => $MCE::Shared::Queue::FIFO,
36           fast   => 0
37        );
38
39        # possible values for "porder" and "type"
40
41        porder =>
42           $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
43           $MCE::Shared::Queue::LOWEST  # Lowest priority items dequeue first
44
45        type =>
46           $MCE::Shared::Queue::FIFO    # First in, first out
47           $MCE::Shared::Queue::LIFO    # Last in, first out
48           $MCE::Shared::Queue::LILO    # Synonym for FIFO
49           $MCE::Shared::Queue::FILO    # Synonym for LIFO
50
51        # below, [ ... ] denotes optional parameters
52
53        $qu->await( [ $pending_threshold ] );
54        $qu->clear();
55        $qu->end();
56
57        $qu->enqueue( $item [, $item, ... ] );
58        $qu->enqueuep( $priority, $item [, $item, ... ] );
59
60        $item  = $qu->dequeue();
61        @items = $qu->dequeue( $count );
62        $item  = $qu->dequeue_nb();
63        @items = $qu->dequeue_nb( $count );
64
65        $qu->insert( $index, $item [, $item, ... ] );
66        $qu->insertp( $priority, $index, $item [, $item, ... ] );
67
68        $count = $qu->pending();
69        $item  = $qu->peek( [ $index ] );
70        $item  = $qu->peekp( $priority [, $index ] );
71        @array = $qu->heap();
72

API DOCUMENTATION

74       new ( [ options ] )
75          Constructs a new object. Supported options are queue, porder, type,
76          await, and fast.
77
78           # non-shared or local construction for use by a single process
79
80           use MCE::Shared::Queue;
81
82           $q1 = MCE::Shared::Queue->new();
83           $q2 = MCE::Shared::Queue->new( queue  => [ 0, 1, 2 ] );
84
85           $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST );
86           $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST  );
87
88           $q5 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::FIFO );
89           $q6 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::LIFO );
90
91           $q7 = MCE::Shared::Queue->new( await  => 1 );
92           $q8 = MCE::Shared::Queue->new( fast   => 1 );
93
94           # construction for sharing with other threads and processes
95
96           use MCE::Shared;
97           use MCE::Shared::Queue;
98
99           $q1 = MCE::Shared->queue();
100           $q2 = MCE::Shared->queue( queue  => [ 0, 1, 2 ] );
101
102           $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
103           $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST  );
104
105           $q5 = MCE::Shared->queue( type   => $MCE::Shared::Queue::FIFO );
106           $q6 = MCE::Shared->queue( type   => $MCE::Shared::Queue::LIFO );
107
108           $q7 = MCE::Shared->queue( await  => 1 );
109           $q8 = MCE::Shared->queue( fast   => 1 );
110
111          The "await" option, when enabled, allows workers to block
112          (semaphore-like) until the number of items pending is equal or less
113          than a threshold value.  The "await" method is described below.
114
115          The "fast" option speeds up dequeues and is not enabled by default.
116          It is beneficial for queues not calling (->clear or ->dequeue_nb)
117          and not altering the optional count value while running; e.g.
118          ->dequeue($count). Basically, do not enable 'fast' if varying the
119          count dynamically.
120
121       await ( pending_threshold )
122          Waits until the queue drops down to threshold items. The "await"
123          method is beneficial when wanting to throttle worker(s) appending to
124          the queue. Perhaps, consumers are running a bit behind and wanting
125          prevent memory consumption from increasing too high. Below, the
126          number of items pending will never go above 20.
127
128           use Time::HiRes qw( sleep );
129
130           use MCE::Flow;
131           use MCE::Shared;
132
133           my $q = MCE::Shared->queue( await => 1, fast => 1 );
134           my ( $producers, $consumers ) = ( 1, 8 );
135
136           mce_flow {
137              task_name   => [ 'producer', 'consumer' ],
138              max_workers => [ $producers, $consumers ],
139           },
140           sub {
141              ## producer
142              for my $item ( 1 .. 100 ) {
143                 $q->enqueue($item);
144
145                 ## blocks until the # of items pending reaches <= 10
146                 if ($item % 10 == 0) {
147                    MCE->say( 'pending: '.$q->pending() );
148                    $q->await(10);
149                 }
150              }
151
152              ## notify consumers no more work
153              $q->end();
154
155           },
156           sub {
157              ## consumers
158              while (defined (my $next = $q->dequeue())) {
159                 MCE->say( MCE->task_wid().': '.$next );
160                 sleep 0.100;
161              }
162           };
163
164       clear ( )
165          Clears the queue of any items.
166
167           $q->clear;
168
169       end ( )
170          Stops the queue from receiving more items. Any worker blocking on
171          "dequeue" will be unblocked automatically. Subsequent calls to
172          "dequeue" will behave like "dequeue_nb". Current API available since
173          MCE::Shared 1.814.
174
175           $q->end();
176
177          MCE Models (e.g. MCE::Flow) may persist between runs. In that case,
178          one might want to enqueue "undef"'s versus calling "end". The number
179          of "undef"'s depends on how many items workers dequeue at a time.
180
181           $q->enqueue((undef) x ($N_workers * 1));  # $q->dequeue()   1 item
182           $q->enqueue((undef) x ($N_workers * 2));  # $q->dequeue(2)  2 items
183           $q->enqueue((undef) x ($N_workers * N));  # $q->dequeue(N)  N items
184
185       enqueue ( item [, item, ... ] )
186          Appends a list of items onto the end of the normal queue.
187
188           $q->enqueue( 'foo' );
189           $q->enqueue( 'bar', 'baz' );
190
191       enqueuep ( priority, item [, item, ... ] )
192          Appends a list of items onto the end of the priority queue with
193          priority.
194
195           $q->enqueue( $priority, 'foo' );
196           $q->enqueue( $priority, 'bar', 'baz' );
197
198       dequeue ( [ count ] )
199          Returns the requested number of items (default 1) from the queue.
200          Priority data will always dequeue first before any data from the
201          normal queue.
202
203           $q->dequeue( 2 );
204           $q->dequeue; # default 1
205
206          The method will block if the queue contains zero items. If the queue
207          contains fewer than the requested number of items, the method will
208          not block, but return whatever items there are on the queue.
209
210          The $count, used for requesting the number of items, is beneficial
211          when workers are passing parameters through the queue. For this
212          reason, always remember to dequeue using the same multiple for the
213          count. This is unlike Thread::Queue which will block until the
214          requested number of items are available.
215
216           # MCE::Shared::Queue 1.816 and prior releases
217           while ( my @items = $q->dequeue(2) ) {
218              last unless ( defined $items[0] );
219              ...
220           }
221
222           # MCE::Shared::Queue 1.817 and later
223           while ( my @items = $q->dequeue(2) ) {
224              ...
225           }
226
227       dequeue_nb ( [ count ] )
228          Returns the requested number of items (default 1) from the queue.
229          Like with dequeue, priority data will always dequeue first. This
230          method is non-blocking and returns "undef" in the absence of data.
231
232           $q->dequeue_nb( 2 );
233           $q->dequeue_nb; # default 1
234
235       insert ( index, item [, item, ... ] )
236          Adds the list of items to the queue at the specified index position
237          (0 is the head of the list). The head of the queue is that item
238          which would be removed by a call to dequeue.
239
240           $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
241           $q->enqueue(1, 2, 3, 4);
242           $q->insert(1, 'foo', 'bar');
243           # Queue now contains: 1, foo, bar, 2, 3, 4
244
245           $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
246           $q->enqueue(1, 2, 3, 4);
247           $q->insert(1, 'foo', 'bar');
248           # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
249
250       insertp ( priority, index, item [, item, ... ] )
251          Adds the list of items to the queue at the specified index position
252          with priority. The behavior is similarly to "$q-"insert> otherwise.
253
254       pending ( )
255          Returns the number of items in the queue. The count includes both
256          normal and priority data. Returns "undef" if the queue has been
257          ended, and there are no more items in the queue.
258
259           $q = MCE::Shared->queue();
260           $q->enqueuep(5, 'foo', 'bar');
261           $q->enqueue('sunny', 'day');
262
263           print $q->pending(), "\n";
264           # Output: 4
265
266       peek ( [ index ] )
267          Returns an item from the normal queue, at the specified index,
268          without dequeuing anything. It defaults to the head of the queue if
269          index is not specified. The head of the queue is that item which
270          would be removed by a call to dequeue. Negative index values are
271          supported, similarly to arrays.
272
273           $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
274           $q->enqueue(1, 2, 3, 4, 5);
275
276           print $q->peek(1), ' ', $q->peek(-2), "\n";
277           # Output: 2 4
278
279           $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
280           $q->enqueue(1, 2, 3, 4, 5);
281
282           print $q->peek(1), ' ', $q->peek(-2), "\n";
283           # Output: 4 2
284
285       peekp ( priority [, index ] )
286          Returns an item from the queue with priority, at the specified
287          index, without dequeuing anything. It defaults to the head of the
288          queue if index is not specified. The behavior is similarly to
289          "$q-"peek> otherwise.
290
291       peekh ( [ index ] )
292          Returns an item from the head of the heap or at the specified index.
293
294           $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
295           $q->enqueuep(5, 'foo');
296           $q->enqueuep(6, 'bar');
297           $q->enqueuep(4, 'sun');
298
299           print $q->peekh(0), "\n";
300           # Output: 6
301
302           $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
303           $q->enqueuep(5, 'foo');
304           $q->enqueuep(6, 'bar');
305           $q->enqueuep(4, 'sun');
306
307           print $q->peekh(0), "\n";
308           # Output: 4
309
310       heap ( )
311          Returns an array containing the heap data. Heap data consists of
312          priority numbers, not the data.
313
314           @h = $q->heap;   # $MCE::Shared::Queue::HIGHEST
315           # Heap contains: 6, 5, 4
316
317           @h = $q->heap;   # $MCE::Shared::Queue::LOWEST
318           # Heap contains: 4, 5, 6
319

ACKNOWLEDGMENTS

321       List::BinarySearch
322          The bsearch_num_pos method was helpful for accommodating the highest
323          and lowest order in MCE::Shared::Queue.
324
325       POE::Queue::Array
326          For extra optimization, two if statements were adopted for checking
327          if the item belongs at the end or head of the queue.
328
329       List::Priority
330          MCE::Shared::Queue supports both normal and priority queues.
331
332       Thread::Queue
333          Thread::Queue is used as a template for identifying and documenting
334          the methods.  MCE::Shared::Queue is not fully compatible due to
335          supporting normal and priority queues simultaneously; e.g.
336
337           $q->enqueue( $item [, $item, ... ] );         # normal queue
338           $q->enqueuep( $p, $item [, $item, ... ] );    # priority queue
339
340           $q->dequeue( [ $count ] );      # priority data dequeues first
341           $q->dequeue_nb( [ $count ] );
342
343           $q->pending();                  # counts both normal/priority queues
344

LIMITATIONS

346       Perl must have IO::FDPass for constructing a shared "condvar" or
347       "queue" while the shared-manager process is running. For platforms
348       where IO::FDPass isn't possible, construct "condvar" and "queue" before
349       other classes.  On systems without "IO::FDPass", the manager process is
350       delayed until sharing other classes or started explicitly.
351
352        use MCE::Shared;
353
354        my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
355
356        my $cv  = MCE::Shared->condvar();
357        my $que = MCE::Shared->queue();
358
359        MCE::Shared->start() unless $has_IO_FDPass;
360
361       Regarding mce_open, "IO::FDPass" is needed for constructing a shared-
362       handle from a non-shared handle not yet available inside the shared-
363       manager process.  The workaround is to have the non-shared handle made
364       before the shared-manager is started. Passing a file by reference is
365       fine for the three STD* handles.
366
367        # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
368
369        mce_open my $shared_in,  "<",  \*STDIN;   # ok
370        mce_open my $shared_out, ">>", \*STDOUT;  # ok
371        mce_open my $shared_err, ">>", \*STDERR;  # ok
372        mce_open my $shared_fh1, "<",  "/path/to/sequence.fasta";  # ok
373        mce_open my $shared_fh2, ">>", "/path/to/results.log";     # ok
374
375        mce_open my $shared_fh, ">>", \*NON_SHARED_FH;  # requires IO::FDPass
376
377       The IO::FDPass module is known to work reliably on most platforms.
378       Install 1.1 or later to rid of limitations described above.
379
380        perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
381

INDEX

383       MCE, MCE::Hobo, MCE::Shared
384

AUTHOR

386       Mario E. Roy, <marioeroy AT gmail DOT com>
387
388
389
390perl v5.28.1                      2019-01-04             MCE::Shared::Queue(3)
Impressum