1MCE::Shared::Queue(3) User Contributed Perl DocumentationMCE::Shared::Queue(3)
2
3
4

NAME

6       MCE::Shared::Queue - Hybrid-queue helper class
7

VERSION

9       This document describes MCE::Shared::Queue version 1.873
10

DESCRIPTION

12       A queue helper class for use as a standalone or managed by MCE::Shared.
13
14       This module is mostly compatible with MCE::Queue except for the
15       "gather" option which is not supported in this context. It provides a
16       queue interface supporting normal and priority queues. Data from shared
17       queues reside under the shared-manager process, otherwise locally.
18

SYNOPSIS

20        # non-shared or local construction for use by a single process
21
22        use MCE::Shared::Queue;
23
24        my $qu = MCE::Shared::Queue->new( await => 1, queue => [ "." ] );
25
26        # construction for sharing with other threads and processes
27
28        use MCE::Shared;
29        use MCE::Shared::Queue;
30
31        my $qu = MCE::Shared->queue(
32           porder => $MCE::Shared::Queue::HIGHEST,
33           type   => $MCE::Shared::Queue::FIFO,
34        );
35
36        # possible values for "porder" and "type"
37
38        porder =>
39           $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
40           $MCE::Shared::Queue::LOWEST  # Lowest priority items dequeue first
41
42        type =>
43           $MCE::Shared::Queue::FIFO    # First in, first out
44           $MCE::Shared::Queue::LIFO    # Last in, first out
45           $MCE::Shared::Queue::LILO    # Synonym for FIFO
46           $MCE::Shared::Queue::FILO    # Synonym for LIFO
47
48        # below, [ ... ] denotes optional parameters
49
50        $qu->await( [ $pending_threshold ] );
51        $qu->clear();
52        $qu->end();
53
54        $qu->enqueue( $item [, $item, ... ] );
55        $qu->enqueuep( $priority, $item [, $item, ... ] );
56
57        $item  = $qu->dequeue();
58        @items = $qu->dequeue( $count );
59        $item  = $qu->dequeue_nb();
60        @items = $qu->dequeue_nb( $count );
61
62        $qu->insert( $index, $item [, $item, ... ] );
63        $qu->insertp( $priority, $index, $item [, $item, ... ] );
64
65        $count = $qu->pending();
66        $item  = $qu->peek( [ $index ] );
67        $item  = $qu->peekp( $priority [, $index ] );
68        @array = $qu->heap();
69

API DOCUMENTATION

71   MCE::Shared::Queue->new ( [ options ] )
72   MCE::Shared->queue ( [ options ] )
73       Constructs a new object. Supported options are queue, porder, type, and
74       await.  Note: The barrier and fast options are silentently ignored (no-
75       op) if specified; starting with 1.867.
76
77        # non-shared or local construction for use by a single process
78
79        use MCE::Shared::Queue;
80
81        $q1 = MCE::Shared::Queue->new();
82        $q2 = MCE::Shared::Queue->new( queue  => [ 0, 1, 2 ] );
83
84        $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST );
85        $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST  );
86
87        $q5 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::FIFO );
88        $q6 = MCE::Shared::Queue->new( type   => $MCE::Shared::Queue::LIFO );
89
90        $q7 = MCE::Shared::Queue->new( await  => 1, barrier => 0 );
91        $q8 = MCE::Shared::Queue->new( fast   => 1 );
92
93        # construction for sharing with other threads and processes
94
95        use MCE::Shared;
96        use MCE::Shared::Queue;
97
98        $q1 = MCE::Shared->queue();
99        $q2 = MCE::Shared->queue( queue  => [ 0, 1, 2 ] );
100
101        $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
102        $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST  );
103
104        $q5 = MCE::Shared->queue( type   => $MCE::Shared::Queue::FIFO );
105        $q6 = MCE::Shared->queue( type   => $MCE::Shared::Queue::LIFO );
106
107        $q7 = MCE::Shared->queue( await  => 1, barrier => 0 );
108        $q8 = MCE::Shared->queue( fast   => 1 );
109
110       The "await" option, when enabled, allows workers to block (semaphore-
111       like) until the number of items pending is equal or less than a
112       threshold value.  The "await" method is described below.
113
114       Obsolete: On Unix platforms, "barrier" mode (enabled by default)
115       prevents many workers from dequeuing simultaneously to lessen overhead
116       for the OS kernel.  Specify 0 to disable barrier mode and not allocate
117       sockets. The barrier option has no effect if constructing the queue
118       inside a thread or enabling "fast".
119
120       Obsolete: The "fast" option speeds up dequeues and is not enabled by
121       default.  It is beneficial for queues not calling (->dequeue_nb) and
122       not altering the count value while running; e.g. ->dequeue($count).
123
124   await ( pending_threshold )
125       Waits until the queue drops down to threshold items. The "await" method
126       is beneficial when wanting to throttle worker(s) appending to the
127       queue. Perhaps, consumers are running a bit behind and wanting prevent
128       memory consumption from increasing too high. Below, the number of items
129       pending will never go above 20.
130
131        use Time::HiRes qw( sleep );
132
133        use MCE::Flow;
134        use MCE::Shared;
135
136        my $q = MCE::Shared->queue( await => 1, fast => 1 );
137        my ( $producers, $consumers ) = ( 1, 8 );
138
139        mce_flow {
140           task_name   => [ 'producer', 'consumer' ],
141           max_workers => [ $producers, $consumers ],
142        },
143        sub {
144           ## producer
145           for my $item ( 1 .. 100 ) {
146              $q->enqueue($item);
147
148              ## blocks until the # of items pending reaches <= 10
149              if ($item % 10 == 0) {
150                 MCE->say( 'pending: '.$q->pending() );
151                 $q->await(10);
152              }
153           }
154
155           ## notify consumers no more work
156           $q->end();
157
158        },
159        sub {
160           ## consumers
161           while (defined (my $next = $q->dequeue())) {
162              MCE->say( MCE->task_wid().': '.$next );
163              sleep 0.100;
164           }
165        };
166
167   clear ( )
168       Clears the queue of any items.
169
170        $q->clear;
171
172   end ( )
173       Stops the queue from receiving more items. Any worker blocking on
174       "dequeue" will be unblocked automatically. Subsequent calls to
175       "dequeue" will behave like "dequeue_nb". Current API available since
176       MCE::Shared 1.814.
177
178        $q->end();
179
180       MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
181       might want to enqueue "undef"'s versus calling "end". The number of
182       "undef"'s depends on how many items workers dequeue at a time.
183
184        $q->enqueue((undef) x ($N_workers * 1));  # $q->dequeue()   1 item
185        $q->enqueue((undef) x ($N_workers * 2));  # $q->dequeue(2)  2 items
186        $q->enqueue((undef) x ($N_workers * N));  # $q->dequeue(N)  N items
187
188   enqueue ( item [, item, ... ] )
189       Appends a list of items onto the end of the normal queue.
190
191        $q->enqueue( 'foo' );
192        $q->enqueue( 'bar', 'baz' );
193
194   enqueuep ( priority, item [, item, ... ] )
195       Appends a list of items onto the end of the priority queue with
196       priority.
197
198        $q->enqueue( $priority, 'foo' );
199        $q->enqueue( $priority, 'bar', 'baz' );
200
201   dequeue ( [ count ] )
202       Returns the requested number of items (default 1) from the queue.
203       Priority data will always dequeue first before any data from the normal
204       queue.
205
206        $q->dequeue( 2 );
207        $q->dequeue; # default 1
208
209       The method will block if the queue contains zero items. If the queue
210       contains fewer than the requested number of items, the method will not
211       block, but return whatever items there are on the queue.
212
213       The $count, used for requesting the number of items, is beneficial when
214       workers are passing parameters through the queue. For this reason,
215       always remember to dequeue using the same multiple for the count. This
216       is unlike Thread::Queue which will block until the requested number of
217       items are available.
218
219        # MCE::Shared::Queue 1.816 and prior releases
220        while ( my @items = $q->dequeue(2) ) {
221           last unless ( defined $items[0] );
222           ...
223        }
224
225        # MCE::Shared::Queue 1.817 and later
226        while ( my @items = $q->dequeue(2) ) {
227           ...
228        }
229
230   dequeue_nb ( [ count ] )
231       Returns the requested number of items (default 1) from the queue. Like
232       with dequeue, priority data will always dequeue first. This method is
233       non-blocking and returns "undef" in the absence of data.
234
235        $q->dequeue_nb( 2 );
236        $q->dequeue_nb; # default 1
237
238   insert ( index, item [, item, ... ] )
239       Adds the list of items to the queue at the specified index position (0
240       is the head of the list). The head of the queue is that item which
241       would be removed by a call to dequeue.
242
243        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
244        $q->enqueue(1, 2, 3, 4);
245        $q->insert(1, 'foo', 'bar');
246        # Queue now contains: 1, foo, bar, 2, 3, 4
247
248        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
249        $q->enqueue(1, 2, 3, 4);
250        $q->insert(1, 'foo', 'bar');
251        # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
252
253   insertp ( priority, index, item [, item, ... ] )
254       Adds the list of items to the queue at the specified index position
255       with priority. The behavior is similarly to "$q->insert" otherwise.
256
257   pending ( )
258       Returns the number of items in the queue. The count includes both
259       normal and priority data. Returns "undef" if the queue has been ended,
260       and there are no more items in the queue.
261
262        $q = MCE::Shared->queue();
263        $q->enqueuep(5, 'foo', 'bar');
264        $q->enqueue('sunny', 'day');
265
266        print $q->pending(), "\n";
267        # Output: 4
268
269   peek ( [ index ] )
270       Returns an item from the normal queue, at the specified index, without
271       dequeuing anything. It defaults to the head of the queue if index is
272       not specified. The head of the queue is that item which would be
273       removed by a call to dequeue. Negative index values are supported,
274       similarly to arrays.
275
276        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
277        $q->enqueue(1, 2, 3, 4, 5);
278
279        print $q->peek(1), ' ', $q->peek(-2), "\n";
280        # Output: 2 4
281
282        $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
283        $q->enqueue(1, 2, 3, 4, 5);
284
285        print $q->peek(1), ' ', $q->peek(-2), "\n";
286        # Output: 4 2
287
288   peekp ( priority [, index ] )
289       Returns an item from the queue with priority, at the specified index,
290       without dequeuing anything. It defaults to the head of the queue if
291       index is not specified. The behavior is similarly to "$q->peek"
292       otherwise.
293
294   peekh ( [ index ] )
295       Returns an item from the head of the heap or at the specified index.
296
297        $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
298        $q->enqueuep(5, 'foo');
299        $q->enqueuep(6, 'bar');
300        $q->enqueuep(4, 'sun');
301
302        print $q->peekh(0), "\n";
303        # Output: 6
304
305        $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
306        $q->enqueuep(5, 'foo');
307        $q->enqueuep(6, 'bar');
308        $q->enqueuep(4, 'sun');
309
310        print $q->peekh(0), "\n";
311        # Output: 4
312
313   heap ( )
314       Returns an array containing the heap data. Heap data consists of
315       priority numbers, not the data.
316
317        @h = $q->heap;   # $MCE::Shared::Queue::HIGHEST
318        # Heap contains: 6, 5, 4
319
320        @h = $q->heap;   # $MCE::Shared::Queue::LOWEST
321        # Heap contains: 4, 5, 6
322

ACKNOWLEDGMENTS

324       •  List::BinarySearch
325
326          The bsearch_num_pos method was helpful for accommodating the highest
327          and lowest order in MCE::Shared::Queue.
328
329       •  POE::Queue::Array
330
331          For extra optimization, two if statements were adopted for checking
332          if the item belongs at the end or head of the queue.
333
334       •  List::Priority
335
336          MCE::Shared::Queue supports both normal and priority queues.
337
338       •  Thread::Queue
339
340          Thread::Queue is used as a template for identifying and documenting
341          the methods.  MCE::Shared::Queue is not fully compatible due to
342          supporting normal and priority queues simultaneously; e.g.
343
344           $q->enqueue( $item [, $item, ... ] );         # normal queue
345           $q->enqueuep( $p, $item [, $item, ... ] );    # priority queue
346
347           $q->dequeue( [ $count ] );      # priority data dequeues first
348           $q->dequeue_nb( [ $count ] );
349
350           $q->pending();                  # counts both normal/priority queues
351

LIMITATIONS

353       Perl must have IO::FDPass for constructing a shared "condvar" or
354       "queue" while the shared-manager process is running. For platforms
355       where IO::FDPass isn't possible, construct "condvar" and "queue" before
356       other classes.  On systems without "IO::FDPass", the manager process is
357       delayed until sharing other classes or started explicitly.
358
359        use MCE::Shared;
360
361        my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
362
363        my $cv  = MCE::Shared->condvar();
364        my $que = MCE::Shared->queue();
365
366        MCE::Shared->start() unless $has_IO_FDPass;
367
368       Regarding mce_open, "IO::FDPass" is needed for constructing a shared-
369       handle from a non-shared handle not yet available inside the shared-
370       manager process.  The workaround is to have the non-shared handle made
371       before the shared-manager is started. Passing a file by reference is
372       fine for the three STD* handles.
373
374        # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
375
376        mce_open my $shared_in,  "<",  \*STDIN;   # ok
377        mce_open my $shared_out, ">>", \*STDOUT;  # ok
378        mce_open my $shared_err, ">>", \*STDERR;  # ok
379        mce_open my $shared_fh1, "<",  "/path/to/sequence.fasta";  # ok
380        mce_open my $shared_fh2, ">>", "/path/to/results.log";     # ok
381
382        mce_open my $shared_fh, ">>", \*NON_SHARED_FH;  # requires IO::FDPass
383
384       The IO::FDPass module is known to work reliably on most platforms.
385       Install 1.1 or later to rid of limitations described above.
386
387        perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
388

INDEX

390       MCE, MCE::Hobo, MCE::Shared
391

AUTHOR

393       Mario E. Roy, <marioeroy AT gmail DOT com>
394
395
396
397perl v5.32.1                      2021-01-27             MCE::Shared::Queue(3)
Impressum