1MCE::Shared::Queue(3) User Contributed Perl DocumentationMCE::Shared::Queue(3)
2
3
4
6 MCE::Shared::Queue - Hybrid-queue helper class
7
9 This document describes MCE::Shared::Queue version 1.840
10
12 A queue helper class for use as a standalone or managed by MCE::Shared.
13
14 This module is mostly compatible with MCE::Queue except for the
15 "gather" option which is not supported in this context. It provides a
16 queue interface supporting normal and priority queues. Data from shared
17 queues reside under the shared-manager process, otherwise locally.
18
20 # non-shared or local construction for use by a single process
21
22 use MCE::Shared::Queue;
23
24 my $qu = MCE::Shared::Queue->new(
25 await => 1, fast => 0, queue => [ "." ]
26 );
27
28 # construction for sharing with other threads and processes
29
30 use MCE::Shared;
31 use MCE::Shared::Queue;
32
33 my $qu = MCE::Shared->queue(
34 porder => $MCE::Shared::Queue::HIGHEST,
35 type => $MCE::Shared::Queue::FIFO,
36 fast => 0
37 );
38
39 # possible values for "porder" and "type"
40
41 porder =>
42 $MCE::Shared::Queue::HIGHEST # Highest priority items dequeue first
43 $MCE::Shared::Queue::LOWEST # Lowest priority items dequeue first
44
45 type =>
46 $MCE::Shared::Queue::FIFO # First in, first out
47 $MCE::Shared::Queue::LIFO # Last in, first out
48 $MCE::Shared::Queue::LILO # Synonym for FIFO
49 $MCE::Shared::Queue::FILO # Synonym for LIFO
50
51 # below, [ ... ] denotes optional parameters
52
53 $qu->await( [ $pending_threshold ] );
54 $qu->clear();
55 $qu->end();
56
57 $qu->enqueue( $item [, $item, ... ] );
58 $qu->enqueuep( $priority, $item [, $item, ... ] );
59
60 $item = $qu->dequeue();
61 @items = $qu->dequeue( $count );
62 $item = $qu->dequeue_nb();
63 @items = $qu->dequeue_nb( $count );
64
65 $qu->insert( $index, $item [, $item, ... ] );
66 $qu->insertp( $priority, $index, $item [, $item, ... ] );
67
68 $count = $qu->pending();
69 $item = $qu->peek( [ $index ] );
70 $item = $qu->peekp( $priority [, $index ] );
71 @array = $qu->heap();
72
74 new ( [ options ] )
75 Constructs a new object. Supported options are queue, porder, type,
76 await, and fast.
77
78 # non-shared or local construction for use by a single process
79
80 use MCE::Shared::Queue;
81
82 $q1 = MCE::Shared::Queue->new();
83 $q2 = MCE::Shared::Queue->new( queue => [ 0, 1, 2 ] );
84
85 $q3 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::HIGHEST );
86 $q4 = MCE::Shared::Queue->new( porder => $MCE::Shared::Queue::LOWEST );
87
88 $q5 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::FIFO );
89 $q6 = MCE::Shared::Queue->new( type => $MCE::Shared::Queue::LIFO );
90
91 $q7 = MCE::Shared::Queue->new( await => 1 );
92 $q8 = MCE::Shared::Queue->new( fast => 1 );
93
94 # construction for sharing with other threads and processes
95
96 use MCE::Shared;
97 use MCE::Shared::Queue;
98
99 $q1 = MCE::Shared->queue();
100 $q2 = MCE::Shared->queue( queue => [ 0, 1, 2 ] );
101
102 $q3 = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
103 $q4 = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
104
105 $q5 = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
106 $q6 = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
107
108 $q7 = MCE::Shared->queue( await => 1 );
109 $q8 = MCE::Shared->queue( fast => 1 );
110
111 The "await" option, when enabled, allows workers to block
112 (semaphore-like) until the number of items pending is equal or less
113 than a threshold value. The "await" method is described below.
114
115 The "fast" option speeds up dequeues and is not enabled by default.
116 It is beneficial for queues not calling (->clear or ->dequeue_nb)
117 and not altering the optional count value while running; e.g.
118 ->dequeue($count). Basically, do not enable 'fast' if varying the
119 count dynamically.
120
121 await ( pending_threshold )
122 Waits until the queue drops down to threshold items. The "await"
123 method is beneficial when wanting to throttle worker(s) appending to
124 the queue. Perhaps, consumers are running a bit behind and wanting
125 prevent memory consumption from increasing too high. Below, the
126 number of items pending will never go above 20.
127
128 use Time::HiRes qw( sleep );
129
130 use MCE::Flow;
131 use MCE::Shared;
132
133 my $q = MCE::Shared->queue( await => 1, fast => 1 );
134 my ( $producers, $consumers ) = ( 1, 8 );
135
136 mce_flow {
137 task_name => [ 'producer', 'consumer' ],
138 max_workers => [ $producers, $consumers ],
139 },
140 sub {
141 ## producer
142 for my $item ( 1 .. 100 ) {
143 $q->enqueue($item);
144
145 ## blocks until the # of items pending reaches <= 10
146 if ($item % 10 == 0) {
147 MCE->say( 'pending: '.$q->pending() );
148 $q->await(10);
149 }
150 }
151
152 ## notify consumers no more work
153 $q->end();
154
155 },
156 sub {
157 ## consumers
158 while (defined (my $next = $q->dequeue())) {
159 MCE->say( MCE->task_wid().': '.$next );
160 sleep 0.100;
161 }
162 };
163
164 clear ( )
165 Clears the queue of any items.
166
167 $q->clear;
168
169 end ( )
170 Stops the queue from receiving more items. Any worker blocking on
171 "dequeue" will be unblocked automatically. Subsequent calls to
172 "dequeue" will behave like "dequeue_nb". Current API available since
173 MCE::Shared 1.814.
174
175 $q->end();
176
177 MCE Models (e.g. MCE::Flow) may persist between runs. In that case,
178 one might want to enqueue "undef"'s versus calling "end". The number
179 of "undef"'s depends on how many items workers dequeue at a time.
180
181 $q->enqueue((undef) x ($N_workers * 1)); # $q->dequeue() 1 item
182 $q->enqueue((undef) x ($N_workers * 2)); # $q->dequeue(2) 2 items
183 $q->enqueue((undef) x ($N_workers * N)); # $q->dequeue(N) N items
184
185 enqueue ( item [, item, ... ] )
186 Appends a list of items onto the end of the normal queue.
187
188 $q->enqueue( 'foo' );
189 $q->enqueue( 'bar', 'baz' );
190
191 enqueuep ( priority, item [, item, ... ] )
192 Appends a list of items onto the end of the priority queue with
193 priority.
194
195 $q->enqueue( $priority, 'foo' );
196 $q->enqueue( $priority, 'bar', 'baz' );
197
198 dequeue ( [ count ] )
199 Returns the requested number of items (default 1) from the queue.
200 Priority data will always dequeue first before any data from the
201 normal queue.
202
203 $q->dequeue( 2 );
204 $q->dequeue; # default 1
205
206 The method will block if the queue contains zero items. If the queue
207 contains fewer than the requested number of items, the method will
208 not block, but return whatever items there are on the queue.
209
210 The $count, used for requesting the number of items, is beneficial
211 when workers are passing parameters through the queue. For this
212 reason, always remember to dequeue using the same multiple for the
213 count. This is unlike Thread::Queue which will block until the
214 requested number of items are available.
215
216 # MCE::Shared::Queue 1.816 and prior releases
217 while ( my @items = $q->dequeue(2) ) {
218 last unless ( defined $items[0] );
219 ...
220 }
221
222 # MCE::Shared::Queue 1.817 and later
223 while ( my @items = $q->dequeue(2) ) {
224 ...
225 }
226
227 dequeue_nb ( [ count ] )
228 Returns the requested number of items (default 1) from the queue.
229 Like with dequeue, priority data will always dequeue first. This
230 method is non-blocking and returns "undef" in the absence of data.
231
232 $q->dequeue_nb( 2 );
233 $q->dequeue_nb; # default 1
234
235 insert ( index, item [, item, ... ] )
236 Adds the list of items to the queue at the specified index position
237 (0 is the head of the list). The head of the queue is that item
238 which would be removed by a call to dequeue.
239
240 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
241 $q->enqueue(1, 2, 3, 4);
242 $q->insert(1, 'foo', 'bar');
243 # Queue now contains: 1, foo, bar, 2, 3, 4
244
245 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
246 $q->enqueue(1, 2, 3, 4);
247 $q->insert(1, 'foo', 'bar');
248 # Queue now contains: 1, 2, 3, 'foo', 'bar', 4
249
250 insertp ( priority, index, item [, item, ... ] )
251 Adds the list of items to the queue at the specified index position
252 with priority. The behavior is similarly to "$q-"insert> otherwise.
253
254 pending ( )
255 Returns the number of items in the queue. The count includes both
256 normal and priority data. Returns "undef" if the queue has been
257 ended, and there are no more items in the queue.
258
259 $q = MCE::Shared->queue();
260 $q->enqueuep(5, 'foo', 'bar');
261 $q->enqueue('sunny', 'day');
262
263 print $q->pending(), "\n";
264 # Output: 4
265
266 peek ( [ index ] )
267 Returns an item from the normal queue, at the specified index,
268 without dequeuing anything. It defaults to the head of the queue if
269 index is not specified. The head of the queue is that item which
270 would be removed by a call to dequeue. Negative index values are
271 supported, similarly to arrays.
272
273 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::FIFO );
274 $q->enqueue(1, 2, 3, 4, 5);
275
276 print $q->peek(1), ' ', $q->peek(-2), "\n";
277 # Output: 2 4
278
279 $q = MCE::Shared->queue( type => $MCE::Shared::Queue::LIFO );
280 $q->enqueue(1, 2, 3, 4, 5);
281
282 print $q->peek(1), ' ', $q->peek(-2), "\n";
283 # Output: 4 2
284
285 peekp ( priority [, index ] )
286 Returns an item from the queue with priority, at the specified
287 index, without dequeuing anything. It defaults to the head of the
288 queue if index is not specified. The behavior is similarly to
289 "$q-"peek> otherwise.
290
291 peekh ( [ index ] )
292 Returns an item from the head of the heap or at the specified index.
293
294 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::HIGHEST );
295 $q->enqueuep(5, 'foo');
296 $q->enqueuep(6, 'bar');
297 $q->enqueuep(4, 'sun');
298
299 print $q->peekh(0), "\n";
300 # Output: 6
301
302 $q = MCE::Shared->queue( porder => $MCE::Shared::Queue::LOWEST );
303 $q->enqueuep(5, 'foo');
304 $q->enqueuep(6, 'bar');
305 $q->enqueuep(4, 'sun');
306
307 print $q->peekh(0), "\n";
308 # Output: 4
309
310 heap ( )
311 Returns an array containing the heap data. Heap data consists of
312 priority numbers, not the data.
313
314 @h = $q->heap; # $MCE::Shared::Queue::HIGHEST
315 # Heap contains: 6, 5, 4
316
317 @h = $q->heap; # $MCE::Shared::Queue::LOWEST
318 # Heap contains: 4, 5, 6
319
321 List::BinarySearch
322 The bsearch_num_pos method was helpful for accommodating the highest
323 and lowest order in MCE::Shared::Queue.
324
325 POE::Queue::Array
326 For extra optimization, two if statements were adopted for checking
327 if the item belongs at the end or head of the queue.
328
329 List::Priority
330 MCE::Shared::Queue supports both normal and priority queues.
331
332 Thread::Queue
333 Thread::Queue is used as a template for identifying and documenting
334 the methods. MCE::Shared::Queue is not fully compatible due to
335 supporting normal and priority queues simultaneously; e.g.
336
337 $q->enqueue( $item [, $item, ... ] ); # normal queue
338 $q->enqueuep( $p, $item [, $item, ... ] ); # priority queue
339
340 $q->dequeue( [ $count ] ); # priority data dequeues first
341 $q->dequeue_nb( [ $count ] );
342
343 $q->pending(); # counts both normal/priority queues
344
346 Perl must have IO::FDPass for constructing a shared "condvar" or
347 "queue" while the shared-manager process is running. For platforms
348 where IO::FDPass isn't possible, construct "condvar" and "queue" before
349 other classes. On systems without "IO::FDPass", the manager process is
350 delayed until sharing other classes or started explicitly.
351
352 use MCE::Shared;
353
354 my $has_IO_FDPass = $INC{'IO/FDPass.pm'} ? 1 : 0;
355
356 my $cv = MCE::Shared->condvar();
357 my $que = MCE::Shared->queue();
358
359 MCE::Shared->start() unless $has_IO_FDPass;
360
361 Regarding mce_open, "IO::FDPass" is needed for constructing a shared-
362 handle from a non-shared handle not yet available inside the shared-
363 manager process. The workaround is to have the non-shared handle made
364 before the shared-manager is started. Passing a file by reference is
365 fine for the three STD* handles.
366
367 # The shared-manager knows of \*STDIN, \*STDOUT, \*STDERR.
368
369 mce_open my $shared_in, "<", \*STDIN; # ok
370 mce_open my $shared_out, ">>", \*STDOUT; # ok
371 mce_open my $shared_err, ">>", \*STDERR; # ok
372 mce_open my $shared_fh1, "<", "/path/to/sequence.fasta"; # ok
373 mce_open my $shared_fh2, ">>", "/path/to/results.log"; # ok
374
375 mce_open my $shared_fh, ">>", \*NON_SHARED_FH; # requires IO::FDPass
376
377 The IO::FDPass module is known to work reliably on most platforms.
378 Install 1.1 or later to rid of limitations described above.
379
380 perl -MIO::FDPass -le "print 'Cheers! Perl has IO::FDPass.'"
381
383 MCE, MCE::Hobo, MCE::Shared
384
386 Mario E. Roy, <marioeroy AT gmail DOT com>
387
388
389
390perl v5.28.1 2019-01-04 MCE::Shared::Queue(3)