1MCE::Shared::Queue(3) User Contributed Perl DocumentationMCE::Shared::Queue(3)
2
3
4
6 MCE::Shared::Queue - Hybrid-queue helper class
7
9 This document describes MCE::Shared::Queue version 1.862
10
12 A queue helper class for use as a standalone or managed by MCE::Shared.
13
14 This module is mostly compatible with MCE::Queue except for the
15 "gather" option which is not supported in this context. It provides a
16 queue interface supporting normal and priority queues. Data from shared
17 queues reside under the shared-manager process, otherwise locally.
18
20 # non-shared or local construction for use by a single process
21
22 use MCE::Shared::Queue;
23
24 my $qu = MCE::Shared::Queue->new(
25 await => 1, fast => 0, queue => [ "." ]
26 );
27
28 # construction for sharing with other threads and processes
29
30 use MCE::Shared;
31 use MCE::Shared::Queue;
32
33 my $qu = MCE::Shared->queue(
34 porder => $MCE::Shared::Queue::HIGHEST,
35 type => $MCE::Shared::Queue::FIFO,
36 fast => 0
37 );
38
39 # possible values for "porder" and "type"
40
41 porder =>
42 $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
43 $MCE::Shared::Queue::LOWEST # Lowest priority items dequeue first
44
45 type =>
46 $MCE::Shared::Queue::FIFO # First in, first out
47 $MCE::Shared::Queue::LIFO # Last in, first out
48 $MCE::Shared::Queue::LILO # Synonym for FIFO
49 $MCE::Shared::Queue::FILO # Synonym for LIFO
50
51 # below, [ ... ] denotes optional parameters
52
53 $qu->await( [ $pending_threshold ] );
54 $qu->clear();
55 $qu->end();
56
57 $qu->enqueue( $item [, $item, ... ] );
58 $qu->enqueuep( $priority, $item [, $item, ... ] );
59
60 $item = $qu->dequeue();
61 @items = $qu->dequeue( $count );
62 $item = $qu->dequeue_nb();
63 @items = $qu->dequeue_nb( $count );
64
65 $qu->insert( $index, $item [, $item, ... ] );
66 $qu->insertp( $priority, $index, $item [, $item, ... ] );
67
68 $count = $qu->pending();
69 $item = $qu->peek( [ $index ] );
70 $item = $qu->peekp( $priority [, $index ] );
71 @array = $qu->heap();
72
74 MCE::Shared::Queue->new ( [ options ] )
75 MCE::Shared->queue ( [ options ] )
76 Constructs a new object. Supported options are queue, porder, type,
77 await, barrier, and fast.
78
79 # non-shared or local construction for use by a single process
80
81 use MCE::Shared::Queue;
82
83 $q1 = MCE::Shared::Queue->new();
84 $q2 = MCE::Shared::Queue->new( queue => [ 0, 1, 2 ] );
85
86 $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST );
87 $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST );
88
89 $q5 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::FIFO );
90 $q6 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::LIFO );
91
92 $q7 = MCE::Shared::Queue->new( await => 1, barrier => 0 );
93 $q8 = MCE::Shared::Queue->new( fast => 1 );
94
95 # construction for sharing with other threads and processes
96
97 use MCE::Shared;
98 use MCE::Shared::Queue;
99
100 $q1 = MCE::Shared->queue();
101 $q2 = MCE::Shared->queue( queue => [ 0, 1, 2 ] );
102
103 $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
104 $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
105
106 $q5 = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
107 $q6 = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
108
109 $q7 = MCE::Shared->queue( await => 1, barrier => 0 );
110 $q8 = MCE::Shared->queue( fast => 1 );
111
112 The "await" option, when enabled, allows workers to block (semaphore-
113 like) until the number of items pending is equal or less than a
114 threshold value. The "await" method is described below.
115
116 On Unix platforms, "barrier" mode (enabled by default) prevents many
117 workers from dequeuing simultaneously to lessen overhead for the OS
118 kernel. Specify 0 to disable barrier mode and not allocate sockets. The
119 barrier option has no effect if constructing the queue inside a thread
120 or enabling "fast".
121
122 The "fast" option speeds up dequeues and is not enabled by default. It
123 is beneficial for queues not calling (->dequeue_nb) and not altering
124 the count value while running; e.g. ->dequeue($count).
125
126 await ( pending_threshold )
127 Waits until the queue drops down to threshold items. The "await" method
128 is beneficial when wanting to throttle worker(s) appending to the
129 queue. Perhaps, consumers are running a bit behind and wanting prevent
130 memory consumption from increasing too high. Below, the number of items
131 pending will never go above 20.
132
133 use Time::HiRes qw( sleep );
134
135 use MCE::Flow;
136 use MCE::Shared;
137
138 my $q = MCE::Shared->queue( await => 1, fast => 1 );
139 my ( $producers, $consumers ) = ( 1, 8 );
140
141 mce_flow {
142 task_name => [ 'producer', 'consumer' ],
143 max_workers => [ $producers, $consumers ],
144 },
145 sub {
146 ## producer
147 for my $item ( 1 .. 100 ) {
148 $q->enqueue($item);
149
150 ## blocks until the # of items pending reaches <= 10
151 if ($item % 10 == 0) {
152 MCE->say( 'pending: '.$q->pending() );
153 $q->await(10);
154 }
155 }
156
157 ## notify consumers no more work
158 $q->end();
159
160 },
161 sub {
162 ## consumers
163 while (defined (my $next = $q->dequeue())) {
164 MCE->say( MCE->task_wid().': '.$next );
165 sleep 0.100;
166 }
167 };
168
169 clear ( )
170 Clears the queue of any items.
171
172 $q->clear;
173
174 end ( )
175 Stops the queue from receiving more items. Any worker blocking on
176 "dequeue" will be unblocked automatically. Subsequent calls to
177 "dequeue" will behave like "dequeue_nb". Current API available since
178 MCE::Shared 1.814.
179
180 $q->end();
181
182 MCE Models (e.g. MCE::Flow) may persist between runs. In that case, one
183 might want to enqueue "undef"'s versus calling "end". The number of
184 "undef"'s depends on how many items workers dequeue at a time.
185
186 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
187 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
188 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
189
190 enqueue ( item [, item, ... ] )
191 Appends a list of items onto the end of the normal queue.
192
193 $q->enqueue( 'foo' );
194 $q->enqueue( 'bar', 'baz' );
195
196 enqueuep ( priority, item [, item, ... ] )
197 Appends a list of items onto the end of the priority queue with
198 priority.
199
200 $q->enqueue( $priority, 'foo' );
201 $q->enqueue( $priority, 'bar', 'baz' );
202
203 dequeue ( [ count ] )
204 Returns the requested number of items (default 1) from the queue.
205 Priority data will always dequeue first before any data from the normal
206 queue.
207
208 $q->dequeue( 2 );
209 $q->dequeue; # default 1
210
211 The method will block if the queue contains zero items. If the queue
212 contains fewer than the requested number of items, the method will not
213 block, but return whatever items there are on the queue.
214
215 The $count, used for requesting the number of items, is beneficial when
216 workers are passing parameters through the queue. For this reason,
217 always remember to dequeue using the same multiple for the count. This
218 is unlike Thread::Queue which will block until the requested number of
219 items are available.
220
221 # MCE::Shared::Queue 1.816 and prior releases
222 while ( my @items = $q->dequeue(2) ) {
223 last unless ( defined $items[0] );
224 ...
225 }
226
227 # MCE::Shared::Queue 1.817 and later
228 while ( my @items = $q->dequeue(2) ) {
229 ...
230 }
231
232 dequeue_nb ( [ count ] )
233 Returns the requested number of items (default 1) from the queue. Like
234 with dequeue, priority data will always dequeue first. This method is
235 non-blocking and returns "undef" in the absence of data.
236
237 $q->dequeue_nb( 2 );
238 $q->dequeue_nb; # default 1
239
240 insert ( index, item [, item, ... ] )
241 Adds the list of items to the queue at the specified index position (0
242 is the head of the list). The head of the queue is that item which
243 would be removed by a call to dequeue.
244
245 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
246 $q->enqueue(1, 2, 3, 4);
247 $q->insert(1, 'foo', 'bar');
248 # Queue now contains: 1, foo, bar, 2, 3, 4
249
250 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
251 $q->enqueue(1, 2, 3, 4);
252 $q->insert(1, 'foo', 'bar');
253 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
254
255 insertp ( priority, index, item [, item, ... ] )
256 Adds the list of items to the queue at the specified index position
257 with priority. The behavior is similarly to "$q->insert" otherwise.
258
259 pending ( )
260 Returns the number of items in the queue. The count includes both
261 normal and priority data. Returns "undef" if the queue has been ended,
262 and there are no more items in the queue.
263
264 $q = MCE::Shared->queue();
265 $q->enqueuep(5, 'foo', 'bar');
266 $q->enqueue('sunny', 'day');
267
268 print $q->pending(), "\n";
269 # Output: 4
270
271 peek ( [ index ] )
272 Returns an item from the normal queue, at the specified index, without
273 dequeuing anything. It defaults to the head of the queue if index is
274 not specified. The head of the queue is that item which would be
275 removed by a call to dequeue. Negative index values are supported,
276 similarly to arrays.
277
278 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
279 $q->enqueue(1, 2, 3, 4, 5);
280
281 print $q->peek(1), ' ', $q->peek(-2), "\n";
282 # Output: 2 4
283
284 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
285 $q->enqueue(1, 2, 3, 4, 5);
286
287 print $q->peek(1), ' ', $q->peek(-2), "\n";
288 # Output: 4 2
289
290 peekp ( priority [, index ] )
291 Returns an item from the queue with priority, at the specified index,
292 without dequeuing anything. It defaults to the head of the queue if
293 index is not specified. The behavior is similarly to "$q->peek"
294 otherwise.
295
296 peekh ( [ index ] )
297 Returns an item from the head of the heap or at the specified index.
298
299 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
300 $q->enqueuep(5, 'foo');
301 $q->enqueuep(6, 'bar');
302 $q->enqueuep(4, 'sun');
303
304 print $q->peekh(0), "\n";
305 # Output: 6
306
307 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
308 $q->enqueuep(5, 'foo');
309 $q->enqueuep(6, 'bar');
310 $q->enqueuep(4, 'sun');
311
312 print $q->peekh(0), "\n";
313 # Output: 4
314
315 heap ( )
316 Returns an array containing the heap data. Heap data consists of
317 priority numbers, not the data.
318
319 @h = $q->heap; # $MCE::Shared::Queue::HIGHEST
320 # Heap contains: 6, 5, 4
321
322 @h = $q->heap; # $MCE::Shared::Queue::LOWEST
323 # Heap contains: 4, 5, 6
324
326 · List::BinarySearch
327
328 The bsearch_num_pos method was helpful for accommodating the highest
329 and lowest order in MCE::Shared::Queue.
330
331 · POE::Queue::Array
332
333 For extra optimization, two if statements were adopted for checking
334 if the item belongs at the end or head of the queue.
335
336 · List::Priority
337
338 MCE::Shared::Queue supports both normal and priority queues.
339
340 · Thread::Queue
341
342 Thread::Queue is used as a template for identifying and documenting
343 the methods. MCE::Shared::Queue is not fully compatible due to
344 supporting normal and priority queues simultaneously; e.g.
345
346 $q->enqueue( $item [, $item, ... ] ); # normal queue
347 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
348
349 $q->dequeue( [ $count ] ); # priority data dequeues first
350 $q->dequeue_nb( [ $count ] );
351
352 $q->pending(); # counts both normal/priority queues
353
355 Perl must have IO::FDPass for constructing a shared "condvar" or
356 "queue" while the shared-manager process is running. For platforms
357 where IO::FDPass isn't possible, construct "condvar" and "queue" before
358 other classes. On systems without "IO::FDPass", the manager process is
359 delayed until sharing other classes or started explicitly.
360
361 use MCE::Shared;
362
363 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
364
365 my $cv = MCE::Shared->condvar();
366 my $que = MCE::Shared->queue();
367
368 MCE::Shared->start() unless $has_IO_FDPass;
369
370 Regarding mce_open, "IO::FDPass" is needed for constructing a shared-
371 handle from a non-shared handle not yet available inside the shared-
372 manager process. The workaround is to have the non-shared handle made
373 before the shared-manager is started. Passing a file by reference is
374 fine for the three STD* handles.
375
376 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
377
378 mce_open my $shared_in, "<", \*STDIN; # ok
379 mce_open my $shared_out, ">>", \*STDOUT; # ok
380 mce_open my $shared_err, ">>", \*STDERR; # ok
381 mce_open my $shared_fh1, "<", "/path/to/sequence.fasta"; # ok
382 mce_open my $shared_fh2, ">>", "/path/to/results.log"; # ok
383
384 mce_open my $shared_fh, ">>", \*NON_SHARED_FH; # requires IO::FDPass
385
386 The IO::FDPass module is known to work reliably on most platforms.
387 Install 1.1 or later to rid of limitations described above.
388
389 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
390
392 MCE, MCE::Hobo, MCE::Shared
393
395 Mario E. Roy, <marioeroy AT gmail DOT com>
396
397
398
399perl v5.30.0 2019-09-19 MCE::Shared::Queue(3)