1MCE::Shared::Queue(3) User Contributed Perl DocumentationMCE::Shared::Queue(3)
2
3
4

NAME

6       MCE::Shared::Queue - Hybrid-queue helper class
7

VERSION

9       This document describes MCE::Shared::Queue version 1.886
10

DESCRIPTION

12       A queue helper class for use as a standalone or managed by MCE::Shared.
13
14       This module is mostly compatible with MCE::Queue except for the
15       "gather" option which is not supported in this context. It provides a
16       queue interface supporting normal and priority queues. Data from shared
17       queues reside under the shared-manager process, otherwise locally.
18

SYNOPSIS

20        # non-shared or local construction for use by a single process
21
22        use MCE::Shared::Queue;
23
24        my $qu = MCE::Shared::Queue->new( await => 1, queue => [ "." ] );
25
26        # construction for sharing with other threads and processes
27
28        use MCE::Shared;
29        use MCE::Shared::Queue;
30
31        my $qu = MCE::Shared->queue(
32           porder => $MCE::Shared::Queue::HIGHEST,
33           type   => $MCE::Shared::Queue::FIFO,
34        );
35
36        # possible values for "porder" and "type"
37
38        porder =>
39           $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
40           $MCE::Shared::Queue::LOWEST  # Lowest priority items dequeue first
41
42        type =>
43           $MCE::Shared::Queue::FIFO    # First in, first out
44           $MCE::Shared::Queue::LIFO    # Last in, first out
45           $MCE::Shared::Queue::LILO    # Synonym for FIFO
46           $MCE::Shared::Queue::FILO    # Synonym for LIFO
47
48        # below, [ ... ] denotes optional parameters
49
50        $qu->await( [ $pending_threshold ] );
51        $qu->clear();
52        $qu->end();
53
54        $qu->enqueue( $item [, $item, ... ] );
55        $qu->enqueuep( $priority, $item [, $item, ... ] );
56
57        $item  = $qu->dequeue();
58        @items = $qu->dequeue( $count );
59        $item  = $qu->dequeue_nb();
60        @items = $qu->dequeue_nb( $count );
61        $item  = $qu->dequeue_timed( $timeout );
62        @items = $qu->dequeue_timed( $timeout, $count );
63
64        $qu->insert( $index, $item [, $item, ... ] );
65        $qu->insertp( $priority, $index, $item [, $item, ... ] );
66
67        $count = $qu->pending();
68        $item  = $qu->peek( [ $index ] );
69        $item  = $qu->peekp( $priority [, $index ] );
70        @array = $qu->heap();
71

API DOCUMENTATION

73   MCE::Shared::Queue->new ( [ options ] )
74   MCE::Shared->queue ( [ options ] )
75       Constructs a new object. Supported options are queue, porder, type, and
76       await.  Note: The barrier and fast options are silentently ignored (no-
77       op) if specified; starting with 1.867.
78
79        # non-shared or local construction for use by a single process
80
81        use MCE::Shared::Queue;
82
83        $q1 = MCE::Shared::Queue->new();
84        $q2 = MCE::Shared::Queue->new( queue  => [ 0, 1, 2 ] );
85
86        $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST );
87        $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST  );
88
89        $q5 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::FIFO );
90        $q6 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::LIFO );
91
92        $q7 = MCE::Shared::Queue->new( await  => 1, barrier => 0 );
93        $q8 = MCE::Shared::Queue->new( fast   => 1 );
94
95        # construction for sharing with other threads and processes
96
97        use MCE::Shared;
98        use MCE::Shared::Queue;
99
100        $q1 = MCE::Shared->queue();
101        $q2 = MCE::Shared->queue( queue  => [ 0, 1, 2 ] );
102
103        $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
104        $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST  );
105
106        $q5 = MCE::Shared->queue( type   => $MCE::Shared::Queue::FIFO );
107        $q6 = MCE::Shared->queue( type   => $MCE::Shared::Queue::LIFO );
108
109        $q7 = MCE::Shared->queue( await  => 1, barrier => 0 );
110        $q8 = MCE::Shared->queue( fast   => 1 );
111
112       The "await" option, when enabled, allows workers to block (semaphore-
113       like) until the number of items pending is equal or less than a
114       threshold value.  The "await" method is described below.
115
116       Obsolete: On Unix platforms, "barrier" mode (enabled by default)
117       prevents many workers from dequeuing simultaneously to lessen overhead
118       for the OS kernel.  Specify 0 to disable barrier mode and not allocate
119       sockets. The barrier option has no effect if constructing the queue
120       inside a thread or enabling "fast".
121
122       Obsolete: The "fast" option speeds up dequeues and is not enabled by
123       default.  It is beneficial for queues not calling (->dequeue_nb) and
124       not altering the count value while running; e.g. ->dequeue($count).
125
126   await ( pending_threshold )
127       Waits until the queue drops down to threshold items. The "await" method
128       is beneficial when wanting to throttle worker(s) appending to the
129       queue. Perhaps, consumers are running a bit behind and wanting prevent
130       memory consumption from increasing too high. Below, the number of items
131       pending will never go above 20.
132
133        use Time::HiRes qw( sleep );
134
135        use MCE::Flow;
136        use MCE::Shared;
137
138        my $q = MCE::Shared->queue( await => 1, fast => 1 );
139        my ( $producers, $consumers ) = ( 1, 8 );
140
141        mce_flow {
142           task_name   => [ 'producer', 'consumer' ],
143           max_workers => [ $producers, $consumers ],
144        },
145        sub {
146           ## producer
147           for my $item ( 1 .. 100 ) {
148              $q->enqueue($item);
149
150              ## blocks until the # of items pending reaches <= 10
151              if ($item % 10 == 0) {
152                 MCE->say( 'pending: '.$q->pending() );
153                 $q->await(10);
154              }
155           }
156
157           ## notify consumers no more work
158           $q->end();
159
160        },
161        sub {
162           ## consumers
163           while (defined (my $next = $q->dequeue())) {
164              MCE->say( MCE->task_wid().': '.$next );
165              sleep 0.100;
166           }
167        };
168
169   clear ( )
170       Clears the queue of any items.
171
172        $q->clear;
173
174   end ( )
175       Stops the queue from receiving more items. Any worker blocking on
176       "dequeue" will be unblocked automatically. Subsequent calls to
177       "dequeue" will behave like "dequeue_nb". Current API available since
178       MCE::Shared 1.814.
179
180        $q->end();
181
182       MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
183       might want to enqueue "undef"'s versus calling "end". The number of
184       "undef"'s depends on how many items workers dequeue at a time.
185
186        $q->enqueue((undef) x ($N_workers * 1));  # $q->dequeue()   1 item
187        $q->enqueue((undef) x ($N_workers * 2));  # $q->dequeue(2)  2 items
188        $q->enqueue((undef) x ($N_workers * N));  # $q->dequeue(N)  N items
189
190   enqueue ( item [, item, ... ] )
191       Appends a list of items onto the end of the normal queue.
192
193        $q->enqueue( 'foo' );
194        $q->enqueue( 'bar', 'baz' );
195
196   enqueuep ( priority, item [, item, ... ] )
197       Appends a list of items onto the end of the priority queue with
198       priority.
199
200        $q->enqueue( $priority, 'foo' );
201        $q->enqueue( $priority, 'bar', 'baz' );
202
203   dequeue ( [ count ] )
204       Returns the requested number of items (default 1) from the queue.
205       Priority data will always dequeue first before any data from the normal
206       queue.
207
208        $q->dequeue;
209        $q->dequeue( 2 );
210
211       The method will block if the queue contains zero items. If the queue
212       contains fewer than the requested number of items, the method will not
213       block, but return whatever items there are on the queue.
214
215       The $count, used for requesting the number of items, is beneficial when
216       workers are passing parameters through the queue. For this reason,
217       always remember to dequeue using the same multiple for the count. This
218       is unlike Thread::Queue which will block until the requested number of
219       items are available.
220
221        # MCE::Shared::Queue 1.816 and prior releases
222        while ( my @items = $q->dequeue(2) ) {
223           last unless ( defined $items[0] );
224           ...
225        }
226
227        # MCE::Shared::Queue 1.817 and later
228        while ( my @items = $q->dequeue(2) ) {
229           ...
230        }
231
232   dequeue_nb ( [ count ] )
233       Returns the requested number of items (default 1) from the queue. Like
234       with dequeue, priority data will always dequeue first. This method is
235       non-blocking and returns "undef" in the absence of data.
236
237        $q->dequeue_nb;
238        $q->dequeue_nb( 2 );
239
240   dequeue_timed ( timeout [, $count ] )
241       Returns the requested number of items (default 1) from the queue. Like
242       with dequeue, priority data will always dequeue first. This method is
243       blocking until the timeout is reached and returns "undef" in the
244       absence of data.  Current API available since MCE::Shared 1.882.
245
246        $q->dequeue_timed( 300 );    # timeout after 5 minutes
247        $q->dequeue_timed( 300, 2 );
248
249       The timeout may be specified as fractional seconds. If timeout is
250       missing, undef, less than or equal to 0, or a non-shared object, then
251       this call behaves like dequeue_nb.
252
253   insert ( index, item [, item, ... ] )
254       Adds the list of items to the queue at the specified index position (0
255       is the head of the list). The head of the queue is that item which
256       would be removed by a call to dequeue.
257
258        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
259        $q->enqueue(1, 2, 3, 4);
260        $q->insert(1, 'foo', 'bar');
261        # Queue now contains: 1, foo, bar, 2, 3, 4
262
263        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
264        $q->enqueue(1, 2, 3, 4);
265        $q->insert(1, 'foo', 'bar');
266        # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
267
268   insertp ( priority, index, item [, item, ... ] )
269       Adds the list of items to the queue at the specified index position
270       with priority. The behavior is similarly to "$q->insert" otherwise.
271
272   pending ( )
273       Returns the number of items in the queue. The count includes both
274       normal and priority data. Returns "undef" if the queue has been ended,
275       and there are no more items in the queue.
276
277        $q = MCE::Shared->queue();
278        $q->enqueuep(5, 'foo', 'bar');
279        $q->enqueue('sunny', 'day');
280
281        print $q->pending(), "\n";
282        # Output: 4
283
284   peek ( [ index ] )
285       Returns an item from the normal queue, at the specified index, without
286       dequeuing anything. It defaults to the head of the queue if index is
287       not specified. The head of the queue is that item which would be
288       removed by a call to dequeue. Negative index values are supported,
289       similarly to arrays.
290
291        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
292        $q->enqueue(1, 2, 3, 4, 5);
293
294        print $q->peek(1), ' ', $q->peek(-2), "\n";
295        # Output: 2 4
296
297        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
298        $q->enqueue(1, 2, 3, 4, 5);
299
300        print $q->peek(1), ' ', $q->peek(-2), "\n";
301        # Output: 4 2
302
303   peekp ( priority [, index ] )
304       Returns an item from the queue with priority, at the specified index,
305       without dequeuing anything. It defaults to the head of the queue if
306       index is not specified. The behavior is similarly to "$q->peek"
307       otherwise.
308
309   peekh ( [ index ] )
310       Returns an item from the head of the heap or at the specified index.
311
312        $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
313        $q->enqueuep(5, 'foo');
314        $q->enqueuep(6, 'bar');
315        $q->enqueuep(4, 'sun');
316
317        print $q->peekh(0), "\n";
318        # Output: 6
319
320        $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
321        $q->enqueuep(5, 'foo');
322        $q->enqueuep(6, 'bar');
323        $q->enqueuep(4, 'sun');
324
325        print $q->peekh(0), "\n";
326        # Output: 4
327
328   heap ( )
329       Returns an array containing the heap data. Heap data consists of
330       priority numbers, not the data.
331
332        @h = $q->heap;   # $MCE::Shared::Queue::HIGHEST
333        # Heap contains: 6, 5, 4
334
335        @h = $q->heap;   # $MCE::Shared::Queue::LOWEST
336        # Heap contains: 4, 5, 6
337

ACKNOWLEDGMENTS

339       •  List::BinarySearch
340
341          The bsearch_num_pos method was helpful for accommodating the highest
342          and lowest order in MCE::Shared::Queue.
343
344       •  POE::Queue::Array
345
346          For extra optimization, two if statements were adopted for checking
347          if the item belongs at the end or head of the queue.
348
349       •  List::Priority
350
351          MCE::Shared::Queue supports both normal and priority queues.
352
353       •  Thread::Queue
354
355          Thread::Queue is used as a template for identifying and documenting
356          the methods.  MCE::Shared::Queue is not fully compatible due to
357          supporting normal and priority queues simultaneously; e.g.
358
359           $q->enqueue( $item [, $item, ... ] );         # normal queue
360           $q->enqueuep( $p, $item [, $item, ... ] );    # priority queue
361
362           $q->dequeue( [ $count ] );      # priority data dequeues first
363           $q->dequeue_nb( [ $count ] );
364
365           $q->pending();                  # counts both normal/priority queues
366

LIMITATIONS

368       Perl must have IO::FDPass for constructing a shared "condvar" or
369       "queue" while the shared-manager process is running. For platforms
370       where IO::FDPass isn't possible, construct "condvar" and "queue" before
371       other classes.  On systems without "IO::FDPass", the manager process is
372       delayed until sharing other classes or started explicitly.
373
374        use MCE::Shared;
375
376        my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
377
378        my $cv  = MCE::Shared->condvar();
379        my $que = MCE::Shared->queue();
380
381        MCE::Shared->start() unless $has_IO_FDPass;
382
383       Regarding mce_open, "IO::FDPass" is needed for constructing a shared-
384       handle from a non-shared handle not yet available inside the shared-
385       manager process.  The workaround is to have the non-shared handle made
386       before the shared-manager is started. Passing a file by reference is
387       fine for the three STD* handles.
388
389        # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
390
391        mce_open my $shared_in,  "<",  \*STDIN;   # ok
392        mce_open my $shared_out, ">>", \*STDOUT;  # ok
393        mce_open my $shared_err, ">>", \*STDERR;  # ok
394        mce_open my $shared_fh1, "<",  "/path/to/sequence.fasta";  # ok
395        mce_open my $shared_fh2, ">>", "/path/to/results.log";     # ok
396
397        mce_open my $shared_fh, ">>", \*NON_SHARED_FH;  # requires IO::FDPass
398
399       The IO::FDPass module is known to work reliably on most platforms.
400       Install 1.1 or later to rid of limitations described above.
401
402        perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
403

INDEX

405       MCE, MCE::Hobo, MCE::Shared
406

AUTHOR

408       Mario E. Roy, <marioeroy AT gmail DOT com>
409
410
411
412perl v5.38.0                      2023-09-14             MCE::Shared::Queue(3)
Impressum