1MCE::Examples(3) User Contributed Perl Documentation MCE::Examples(3)
2
3
4
6 MCE::Examples - Various examples and demonstrations
7
9 This document describes MCE::Examples version 1.837
10
12 A wrapper script for parallelizing the grep binary. Hence, processing
13 is done by the binary, not Perl. This wrapper resides under the bin
14 directory.
15
16 mce_grep
17 A wrapper script with support for the following C binaries.
18 agrep, grep, egrep, fgrep, and tre-agrep
19
20 Chunking may be applied either at the [file] level, for large
21 file(s), or at the [list] level when parsing many files
22 recursively.
23
24 The gain in performance is noticeable for expensive patterns,
25 especially with agrep and tre-agrep.
26
28 The examples directory, beginning with 1.700, is maintained separately
29 at a Github repository <https://github.com/marioroy/mce-examples> and
30 no longer included with the Perl MCE distribution.
31
33 The next section describes ways to process input data in MCE.
34
35 CHUNK_SIZE => 1 (in essence, disabling chunking)
36 Imagine a long running process and wanting to parallelize an array
37 against a pool of workers. The sequence option may be used if simply
38 wanting to loop through a sequence of numbers instead.
39
40 Below, a callback function is used for displaying results. The logic
41 shows how one can output results immediately while still preserving
42 output order as if processing serially. The %tmp hash is a temporary
43 cache for out-of-order results.
44
45 use MCE;
46
47 ## Return an iterator for preserving output order.
48
49 sub preserve_order {
50 my (%result_n, %result_d); my $order_id = 1;
51
52 return sub {
53 my ($chunk_id, $n, $data) = @_;
54
55 $result_n{ $chunk_id } = $n;
56 $result_d{ $chunk_id } = $data;
57
58 while (1) {
59 last unless exists $result_d{$order_id};
60
61 printf "n: %5d sqrt(n): %7.3f\n",
62 $result_n{$order_id}, $result_d{$order_id};
63
64 delete $result_n{$order_id};
65 delete $result_d{$order_id};
66
67 $order_id++;
68 }
69
70 return;
71 };
72 }
73
74 ## Use $chunk_ref->[0] or $_ to retrieve the element.
75 my @input_data = (0 .. 18000 - 1);
76
77 my $mce = MCE->new(
78 gather => preserve_order, input_data => \@input_data,
79 chunk_size => 1, max_workers => 3,
80
81 user_func => sub {
82 my ($mce, $chunk_ref, $chunk_id) = @_;
83 MCE->gather($chunk_id, $_, sqrt($_));
84 }
85 );
86
87 $mce->run;
88
89 This does the same thing using the foreach "sugar" method.
90
91 use MCE;
92
93 sub preserve_order {
94 ...
95 }
96
97 my $mce = MCE->new(
98 chunk_size => 1, max_workers => 3,
99 gather => preserve_order
100 );
101
102 ## Use $chunk_ref->[0] or $_ to retrieve the element.
103 my @input_data = (0 .. 18000 - 1);
104
105 $mce->foreach( \@input_data, sub {
106 my ($mce, $chunk_ref, $chunk_id) = @_;
107 MCE->gather($chunk_id, $_, sqrt($_));
108 });
109
110 The 2 examples described above were done using the Core API. MCE 1.5
111 comes with several models. The MCE::Loop model is used below.
112
113 use MCE::Loop;
114
115 sub preserve_order {
116 ...
117 }
118
119 MCE::Loop::init {
120 chunk_size => 1, max_workers => 3,
121 gather => preserve_order
122 };
123
124 ## Use $chunk_ref->[0] or $_ to retrieve the element.
125 my @input_data = (0 .. 18000 - 1);
126
127 mce_loop {
128 my ($mce, $chunk_ref, $chunk_id) = @_;
129 MCE->gather($chunk_id, $_, sqrt($_));
130
131 } @input_data;
132
133 MCE::Loop::finish;
134
135 CHUNKING INPUT DATA
136 Chunking has the effect of reducing IPC overhead by many folds. A chunk
137 containing $chunk_size items is sent to the next available worker.
138
139 use MCE;
140
141 ## Return an iterator for preserving output order.
142
143 sub preserve_order {
144 my (%result_n, %result_d, $size); my $order_id = 1;
145
146 return sub {
147 my ($chunk_id, $n_ref, $data_ref) = @_;
148
149 $result_n{ $chunk_id } = $n_ref;
150 $result_d{ $chunk_id } = $data_ref;
151
152 while (1) {
153 last unless exists $result_d{$order_id};
154 $size = @{ $result_d{$order_id} };
155
156 for (0 .. $size - 1) {
157 printf "n: %5d sqrt(n): %7.3f\n",
158 $result_n{$order_id}->[$_], $result_d{$order_id}->[$_];
159 }
160
161 delete $result_n{$order_id};
162 delete $result_d{$order_id};
163
164 $order_id++;
165 }
166
167 return;
168 };
169 }
170
171 ## Chunking requires one to loop inside the code block.
172 my @input_data = (0 .. 18000 - 1);
173
174 my $mce = MCE->new(
175 gather => preserve_order, input_data => \@input_data,
176 chunk_size => 500, max_workers => 3,
177
178 user_func => sub {
179 my ($mce, $chunk_ref, $chunk_id) = @_;
180 my (@n, @result);
181
182 foreach ( @{ $chunk_ref } ) {
183 push @n, $_;
184 push @result, sqrt($_);
185 }
186
187 MCE->gather($chunk_id, \@n, \@result);
188 }
189 );
190
191 $mce->run;
192
193 This does the same thing using the forchunk "sugar" method.
194
195 use MCE;
196
197 sub preserve_order {
198 ...
199 }
200
201 my $mce = MCE->new(
202 chunk_size => 500, max_workers => 3,
203 gather => preserve_order
204 );
205
206 ## Chunking requires one to loop inside the code block.
207 my @input_data = (0 .. 18000 - 1);
208
209 $mce->forchunk( \@input_data, sub {
210 my ($mce, $chunk_ref, $chunk_id) = @_;
211 my (@n, @result);
212
213 foreach ( @{ $chunk_ref } ) {
214 push @n, $_;
215 push @result, sqrt($_);
216 }
217
218 MCE->gather($chunk_id, \@n, \@result);
219 });
220
221 Finally, chunking with the MCE::Loop model.
222
223 use MCE::Loop;
224
225 sub preserve_order {
226 ...
227 }
228
229 MCE::Loop::init {
230 chunk_size => 500, max_workers => 3,
231 gather => preserve_order
232 };
233
234 ## Chunking requires one to loop inside the code block.
235 my @input_data = (0 .. 18000 - 1);
236
237 mce_loop {
238 my ($mce, $chunk_ref, $chunk_id) = @_;
239 my (@n, @result);
240
241 foreach ( @{ $chunk_ref } ) {
242 push @n, $_;
243 push @result, sqrt($_);
244 }
245
246 MCE->gather($chunk_id, \@n, \@result);
247
248 } @input_data;
249
250 MCE::Loop::finish;
251
253 The following is an extract from the seq_demo.pl example included with
254 MCE. Think of having several MCEs running in parallel. The sequence
255 and chunk_size options may be specified uniquely per each task.
256
257 The input scalar $_ (not shown below) contains the same value as $seq_n
258 in user_func.
259
260 use MCE;
261 use Time::HiRes 'sleep';
262
263 ## Run with seq_demo.pl | sort
264
265 sub user_func {
266 my ($mce, $seq_n, $chunk_id) = @_;
267
268 my $wid = MCE->wid;
269 my $task_id = MCE->task_id;
270 my $task_wid = MCE->task_wid;
271
272 if (ref $seq_n eq 'ARRAY') {
273 ## seq_n or $_ is an array reference when chunk_size > 1
274 foreach (@{ $seq_n }) {
275 MCE->printf(
276 "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
277 $task_id, $_, $chunk_id, $wid, $task_wid
278 );
279 }
280 }
281 else {
282 MCE->printf(
283 "task_id %d: seq_n %s: chunk_id %d: wid %d: task_wid %d\n",
284 $task_id, $seq_n, $chunk_id, $wid, $task_wid
285 );
286 }
287
288 sleep 0.003;
289
290 return;
291 }
292
293 ## Each task can be configured uniquely.
294
295 my $mce = MCE->new(
296 user_tasks => [{
297 max_workers => 2,
298 chunk_size => 1,
299 sequence => { begin => 11, end => 19, step => 1 },
300 user_func => \&user_func
301 },{
302 max_workers => 2,
303 chunk_size => 5,
304 sequence => { begin => 21, end => 29, step => 1 },
305 user_func => \&user_func
306 },{
307 max_workers => 2,
308 chunk_size => 3,
309 sequence => { begin => 31, end => 39, step => 1 },
310 user_func => \&user_func
311 }]
312 );
313
314 $mce->run;
315
316 -- Output
317
318 task_id 0: seq_n 11: chunk_id 1: wid 2: task_wid 2
319 task_id 0: seq_n 12: chunk_id 2: wid 1: task_wid 1
320 task_id 0: seq_n 13: chunk_id 3: wid 2: task_wid 2
321 task_id 0: seq_n 14: chunk_id 4: wid 1: task_wid 1
322 task_id 0: seq_n 15: chunk_id 5: wid 2: task_wid 2
323 task_id 0: seq_n 16: chunk_id 6: wid 1: task_wid 1
324 task_id 0: seq_n 17: chunk_id 7: wid 2: task_wid 2
325 task_id 0: seq_n 18: chunk_id 8: wid 1: task_wid 1
326 task_id 0: seq_n 19: chunk_id 9: wid 2: task_wid 2
327 task_id 1: seq_n 21: chunk_id 1: wid 3: task_wid 1
328 task_id 1: seq_n 22: chunk_id 1: wid 3: task_wid 1
329 task_id 1: seq_n 23: chunk_id 1: wid 3: task_wid 1
330 task_id 1: seq_n 24: chunk_id 1: wid 3: task_wid 1
331 task_id 1: seq_n 25: chunk_id 1: wid 3: task_wid 1
332 task_id 1: seq_n 26: chunk_id 2: wid 4: task_wid 2
333 task_id 1: seq_n 27: chunk_id 2: wid 4: task_wid 2
334 task_id 1: seq_n 28: chunk_id 2: wid 4: task_wid 2
335 task_id 1: seq_n 29: chunk_id 2: wid 4: task_wid 2
336 task_id 2: seq_n 31: chunk_id 1: wid 5: task_wid 1
337 task_id 2: seq_n 32: chunk_id 1: wid 5: task_wid 1
338 task_id 2: seq_n 33: chunk_id 1: wid 5: task_wid 1
339 task_id 2: seq_n 34: chunk_id 2: wid 6: task_wid 2
340 task_id 2: seq_n 35: chunk_id 2: wid 6: task_wid 2
341 task_id 2: seq_n 36: chunk_id 2: wid 6: task_wid 2
342 task_id 2: seq_n 37: chunk_id 3: wid 5: task_wid 1
343 task_id 2: seq_n 38: chunk_id 3: wid 5: task_wid 1
344 task_id 2: seq_n 39: chunk_id 3: wid 5: task_wid 1
345
347 It is possible that Perl may create a new code ref on subsequent runs
348 causing MCE models to re-spawn. One solution to this is to declare
349 global variables, referenced by workers, with "our" instead of "my".
350
351 Let's take a look. The $i variable is declared with my and being
352 reference in both user_begin and mce_loop blocks. This will cause Perl
353 to create a new code ref for mce_loop on subsequent runs.
354
355 use MCE::Loop;
356
357 my $i = 0; ## <-- this is the reason, try our instead
358
359 MCE::Loop::init {
360 user_begin => sub {
361 print "process_id: $$\n" if MCE->wid == 1;
362 $i++;
363 },
364 chunk_size => 1, max_workers => 'auto',
365 };
366
367 for (1..2) {
368 ## Perl creates another code block ref causing workers
369 ## to re-spawn on subsequent runs.
370 print "\n"; mce_loop { print "$i: $_\n" } 1..4;
371 }
372
373 MCE::Loop::finish;
374
375 -- Output
376
377 process_id: 51380
378 1: 1
379 1: 2
380 1: 3
381 1: 4
382
383 process_id: 51388
384 1: 1
385 1: 2
386 1: 3
387 1: 4
388
389 By making the one line change, we see that workers persist for the
390 duration of the script.
391
392 use MCE::Loop;
393
394 our $i = 0; ## <-- changed my to our
395
396 MCE::Loop::init {
397 user_begin => sub {
398 print "process_id: $$\n" if MCE->wid == 1;
399 $i++;
400 },
401 chunk_size => 1, max_workers => 'auto',
402 };
403
404 for (1..2) {
405 ## Workers persist between runs. No re-spawning.
406 print "\n"; mce_loop { print "$i: $_\n" } 1..4;
407 }
408
409 -- Output
410
411 process_id: 51457
412 1: 1
413 1: 2
414 1: 4
415 1: 3
416
417 process_id: 51457
418 2: 1
419 2: 2
420 2: 3
421 2: 4
422
423 One may alternatively specify a code reference to existing routines for
424 user_begin and mce_loop. Take notice of the comma after \&_func though.
425
426 use MCE::Loop;
427
428 my $i = 0; ## my (ok)
429
430 sub _begin {
431 print "process_id: $$\n" if MCE->wid == 1;
432 $i++;
433 }
434 sub _func {
435 print "$i: $_\n";
436 }
437
438 MCE::Loop::init {
439 user_begin => \&_begin,
440 chunk_size => 1, max_workers => 'auto',
441 };
442
443 for (1..2) {
444 print "\n"; mce_loop \&_func, 1..4;
445 }
446
447 MCE::Loop::finish;
448
449 -- Output
450
451 process_id: 51626
452 1: 1
453 1: 2
454 1: 3
455 1: 4
456
457 process_id: 51626
458 2: 1
459 2: 2
460 2: 3
461 2: 4
462
464 For the next demonstration, MCE::Relay allows a section of code to run
465 serially and orderly between workers. Relay capabilities is enabled
466 with the "init_relay" option, which loads MCE::Relay.
467
468 # perl mandelbrot.pl 16000 > image.pbm
469 # outputs a pbm binary to STDOUT
470
471 # The Computer Language Benchmarks Game
472 # http://benchmarksgame.alioth.debian.org/
473 #
474 # Started with:
475 # C# : Adapted by Antti Lankila from Isaac Gouy's implementation
476 # Perl: Contributed by Mykola Zubach
477 #
478 # MCE::Loop version by Mario Roy
479 # requires MCE 1.807+
480
481 use strict;
482 use warnings;
483
484 use MCE::Loop;
485
486 use constant MAXITER => 50;
487 use constant LIMIT => 4.0;
488 use constant XMIN => -1.5;
489 use constant YMIN => -1.0;
490
491 my ( $w, $h, $m, $invN );
492
493 sub draw_lines {
494 my ( $y1, $y2 ) = @_;
495 my @result;
496
497 # Workers run simultaneously, in parallel.
498
499 for my $y ( $y1 .. $y2 ) {
500 my ( $bits, $xcounter, @line ) = ( 0, 0 );
501 my $Ci = $y * $invN + YMIN;
502
503 for my $x ( 0 .. $w - 1 ) {
504 my ( $Zr, $Zi, $Tr, $Ti ) = ( 0, 0, 0, 0 );
505 my $Cr = $x * $invN + XMIN;
506
507 $bits = $bits << 1;
508
509 for ( 1 .. MAXITER ) {
510 $Zi = $Zi * 2 * $Zr + $Ci;
511 $Zr = $Tr - $Ti + $Cr;
512 $Ti = $Zi * $Zi, $Tr = $Zr * $Zr;
513
514 $bits |= 1, last if ( $Tr + $Ti > LIMIT );
515 }
516
517 if ( ++$xcounter == 8 ) {
518 push @line, $bits ^ 0xff;
519 $bits = $xcounter = 0;
520 }
521 }
522
523 if ( $xcounter ) {
524 push @line, ( $bits << ( 8 - $xcounter ) ) ^ 0xff;
525 }
526
527 push @result, pack 'C*', @line;
528 }
529
530 # Statements between lock & unlock are processed serially & orderly.
531
532 MCE->relay_lock;
533
534 print @result; # Workers display upper-half only.
535 MCE->gather( @result ); # Gather lines for the manager-process.
536
537 MCE->relay_unlock;
538 }
539
540 ## MAIN()
541
542 # Important, must flush output immediately.
543
544 $| = 1; binmode STDOUT;
545
546 $w = $h = shift || 200;
547 $m = int( $h / 2 );
548 $invN = 2 / $w;
549
550 print "P4\n$w $h\n"; # PBM image header.
551
552 # Workers display upper-half only. Also, lines are gathered to be
553 # displayed later by the manager-process after running.
554
555 MCE::Loop->init(
556 init_relay => 0, # Enables MCE::Relay capabilities if defined.
557 max_workers => 4,
558 bounds_only => 1,
559 );
560
561 my @upper = mce_loop_s { draw_lines( $_[1][0], $_[1][1] ) } 0, $m;
562
563 MCE::Loop->finish;
564
565 # Remove first and last lines from the upper half.
566 # Then, output bottom half.
567
568 shift @upper, pop @upper;
569 print reverse @upper;
570
572 There is an article on the web (search for comp.lang.perl.misc MCE)
573 suggesting that MCE::Examples does not cover a simple simulation
574 scenario. This section demonstrates just that.
575
576 The serial code is based off the one by "gamo". A sleep is added to
577 imitate extra CPU time. The while loop is wrapped within a for loop to
578 run 10 times. The random number generator is seeded as well.
579
580 use Time::HiRes qw/sleep time/;
581
582 srand 5906;
583
584 my ($var, $foo, $bar) = (1, 2, 3);
585 my ($r, $a, $b);
586
587 my $start = time;
588
589 for (1..10) {
590 while (1) {
591 $r = rand;
592
593 $a = $r * ($var + $foo + $bar);
594 $b = sqrt($var + $foo + $bar);
595
596 last if ($a < $b + 0.001 && $a > $b - 0.001);
597 sleep 0.002;
598 }
599
600 print "$r -> $a\n";
601 }
602
603 my $end = time;
604
605 printf {*STDERR} "\n## compute time: %0.03f secs\n\n", $end - $start;
606
607 -- Output
608
609 0.408246276657106 -> 2.44947765994264
610 0.408099657137821 -> 2.44859794282693
611 0.408285842931324 -> 2.44971505758794
612 0.408342292008765 -> 2.45005375205259
613 0.408333076522673 -> 2.44999845913604
614 0.408344266898869 -> 2.45006560139321
615 0.408084104120526 -> 2.44850462472316
616 0.408197400014714 -> 2.44918440008828
617 0.408344783704855 -> 2.45006870222913
618 0.408248062985479 -> 2.44948837791287
619
620 ## compute time: 93.049 secs
621
622 Next, we'd do the same with MCE. The demonstration requires at least
623 MCE 1.509 to run properly. Folks on prior releases (1.505 - 1.508) will
624 not see output for the 2nd run and beyond.
625
626 use Time::HiRes qw/sleep time/;
627 use MCE::Loop;
628
629 srand 5906;
630
631 ## Configure MCE. Move common variables inside the user_begin
632 ## block when not needed by the manager process.
633
634 MCE::Loop::init {
635 user_begin => sub {
636 use vars qw($var $foo $bar);
637 our ($var, $foo, $bar) = (1, 2, 3);
638 },
639 chunk_size => 1, max_workers => 'auto',
640 input_data => \&_input, gather => \&_gather
641 };
642
643 ## Callback functions.
644
645 my ($done, $r, $a);
646
647 sub _input {
648 return if $done;
649 return rand;
650 }
651
652 sub _gather {
653 my ($_r, $_a, $_b) = @_;
654 return if $done;
655
656 if ($_a < $_b + 0.001 && $_a > $_b - 0.001) {
657 ($done, $r, $a) = (1, $_r, $_a);
658 }
659 return;
660 }
661
662 ## Compute in parallel.
663
664 my $start = time;
665
666 for (1..10) {
667 $done = 0; ## Reset $done before running
668
669 mce_loop {
670 # my ($mce, $chunk_ref, $chunk_id) = @_;
671 # my $r = $chunk_ref->[0];
672
673 my $r = $_; ## Valid due to chunk_size => 1
674
675 my $a = $r * ($var + $foo + $bar);
676 my $b = sqrt($var + $foo + $bar);
677
678 MCE->gather($r, $a, $b);
679 sleep 0.002;
680 };
681
682 print "$r -> $a\n";
683 }
684
685 printf "\n## compute time: %0.03f secs\n\n", time - $start;
686
687 -- Output
688
689 0.408246276657106 -> 2.44947765994264
690 0.408099657137821 -> 2.44859794282693
691 0.408285842931324 -> 2.44971505758794
692 0.408342292008765 -> 2.45005375205259
693 0.408333076522673 -> 2.44999845913604
694 0.408344266898869 -> 2.45006560139321
695 0.408084104120526 -> 2.44850462472316
696 0.408197400014714 -> 2.44918440008828
697 0.408344783704855 -> 2.45006870222913
698 0.408248062985479 -> 2.44948837791287
699
700 ## compute time: 12.990 secs
701
702 Well, there you have it. MCE is able to complete the same simulation
703 many times faster.
704
706 There are occasions when one wants several workers to run in parallel
707 without having to specify input_data or sequence. These two options are
708 optional in MCE. The "do" and "sendto" methods, for sending data to the
709 manager process, are demonstrated below. Both process serially by the
710 manager process on a first come, first serve basis.
711
712 use MCE::Flow max_workers => 4;
713
714 sub report_stats {
715 my ($wid, $msg, $h_ref) = @_;
716 print "Worker $wid says $msg: ", $h_ref->{"counter"}, "\n";
717 }
718
719 mce_flow sub {
720 my ($mce) = @_;
721 my $wid = MCE->wid;
722
723 if ($wid == 1) {
724 my %h = ("counter" => 0);
725 while (1) {
726 $h{"counter"} += 1;
727 MCE->do("report_stats", $wid, "Hey there", \%h);
728 last if ($h{"counter"} == 4);
729 sleep 2;
730 }
731 }
732 else {
733 my %h = ("counter" => 0);
734 while (1) {
735 $h{"counter"} += 1;
736 MCE->do("report_stats", $wid, "Welcome..", \%h);
737 last if ($h{"counter"} == 2);
738 sleep 4;
739 }
740 }
741
742 MCE->print(\*STDERR, "Worker $wid is exiting\n");
743 };
744
745 -- Output
746
747 Note how worker 2 comes first in the 2nd run below.
748
749 $ ./demo.pl
750 Worker 1 says Hey there: 1
751 Worker 2 says Welcome..: 1
752 Worker 3 says Welcome..: 1
753 Worker 4 says Welcome..: 1
754 Worker 1 says Hey there: 2
755 Worker 2 says Welcome..: 2
756 Worker 3 says Welcome..: 2
757 Worker 1 says Hey there: 3
758 Worker 2 is exiting
759 Worker 3 is exiting
760 Worker 4 says Welcome..: 2
761 Worker 4 is exiting
762 Worker 1 says Hey there: 4
763 Worker 1 is exiting
764
765 $ ./demo.pl
766 Worker 2 says Welcome..: 1
767 Worker 1 says Hey there: 1
768 Worker 4 says Welcome..: 1
769 Worker 3 says Welcome..: 1
770 Worker 1 says Hey there: 2
771 Worker 2 says Welcome..: 2
772 Worker 4 says Welcome..: 2
773 Worker 3 says Welcome..: 2
774 Worker 2 is exiting
775 Worker 4 is exiting
776 Worker 1 says Hey there: 3
777 Worker 3 is exiting
778 Worker 1 says Hey there: 4
779 Worker 1 is exiting
780
782 Capturing "STDERR" and "STDOUT" is possible with App::Cmd::Tester. MCE
783 v1.708 or later is required to run the demonstration.
784
785 use App::Cmd::Tester;
786 use MCE;
787
788 my $mce = MCE->new(
789 max_workers => 4,
790
791 user_func => sub {
792 my $wid = MCE->wid;
793
794 # MCE->sendto('stderr', "$wid: sendto err\n");
795 # MCE->sendto(\*STDERR, "$wid: sendto err\n");
796 MCE->print(\*STDERR, "$wid: print err\n");
797
798 # MCE->sendto('stdout', "$wid: sendto out\n");
799 # MCE->sendto(\*STDOUT, "$wid: sendto out\n");
800 # MCE->print(\*STDOUT, "$wid: print out\n");
801 MCE->print("$wid: print out\n");
802 }
803 );
804
805 my $result = test_app(
806 $mce => []
807 );
808
809 print "# stderr\n";
810 print $result->stderr;
811 print "\n";
812
813 print "# stdout\n";
814 print $result->stdout;
815 print "\n";
816
817 print "# output\n";
818 print $result->output;
819 print "\n";
820
821 print "# exit code\n";
822 print $result->exit_code;
823 print "\n\n";
824
825 -- Output
826
827 # stderr
828 3: print err
829 4: print err
830 1: print err
831 2: print err
832
833 # stdout
834 3: print out
835 4: print out
836 1: print out
837 2: print out
838
839 # output
840 3: print err
841 3: print out
842 4: print err
843 1: print err
844 4: print out
845 1: print out
846 2: print err
847 2: print out
848
849 # exit code
850 0
851
852 The next demonstration captures a sequence of numbers orderly. The slot
853 name for "IO::TieCombine" must be "stdout" or "stderr" for MCE->print
854 to work.
855
856 use MCE::Flow;
857 use MCE::Candy;
858 use IO::TieCombine;
859
860 my $hub = IO::TieCombine->new;
861
862 {
863 tie local *STDOUT, $hub, 'stdout';
864
865 MCE::Flow::init {
866 max_workers => 4,
867 chunk_size => 500,
868 bounds_only => 1,
869 gather => MCE::Candy::out_iter_fh(\*STDOUT),
870 };
871
872 mce_flow_s sub {
873 my ($mce, $seq, $chunk_id) = @_;
874 my $output = '';
875
876 for my $n ( $seq->[0] .. $seq->[1] ) {
877 $output .= "$n\n";
878 }
879
880 # do this if output order is not required
881 # $mce->print(\*STDOUT, $output);
882
883 # or this if preserving output order is desired
884 $mce->gather($chunk_id, $output);
885
886 }, 1, 100000;
887
888 MCE::Flow::finish;
889 }
890
891 my $content = $hub->slot_contents('stdout');
892 my $answer = join("", map { "$_\n" } 1..100000);
893
894 if ($content eq $answer) {
895 print "ordered: yes\n";
896 } else {
897 print "ordered: no\n";
898 }
899
900 -- Output
901
902 ordered: yes
903
905 Making an executable is possible with the PAR::Packer module. On the
906 Windows platform, threads, threads::shared, and exiting via threads are
907 necessary for the binary to exit successfully.
908
909 # https://metacpan.org/pod/PAR::Packer
910 # https://metacpan.org/pod/pp
911 #
912 # pp -o demo.exe demo.pl
913 # ./demo.exe
914
915 use strict;
916 use warnings;
917
918 use if $^O eq "MSWin32", "threads";
919 use if $^O eq "MSWin32", "threads::shared";
920
921 use Time::HiRes (); # include minimum dependencies for MCE
922 use Storable ();
923
924 use IO::FDPass (); # optional: for MCE::Shared->condvar, handle, queue
925 use Sereal (); # optional: faster serialization, may omit Storable
926
927 use MCE;
928
929 my $mce = MCE->new(
930 max_workers => 4,
931 user_func => sub {
932 print "hello from ", MCE->wid(), "\n";
933 }
934 );
935
936 $mce->run();
937
938 threads->exit(0) if $INC{"threads.pm"};
939
940 With MCE::Shared 1.808 and later releases, MCE::Hobo works just the
941 same. The following compiles fine on UNIX and the Windows platform.
942
943 # https://metacpan.org/pod/PAR::Packer
944 # https://metacpan.org/pod/pp
945 #
946 # pp -o demo.exe demo.pl
947 # ./demo.exe
948
949 use strict;
950 use warnings;
951
952 use if $^O eq "MSWin32", "threads";
953 use if $^O eq "MSWin32", "threads::shared";
954
955 use Time::HiRes (); # include minimum dependencies for MCE::Hobo
956 use Storable ();
957
958 use IO::FDPass (); # optional: for MCE::Shared->condvar, handle, queue
959 use Sereal (); # optional: faster serialization, may omit Storable
960
961 use MCE::Hobo; # 1.808 or later on Windows
962 use MCE::Shared;
963
964 my $seq_a = MCE::Shared->sequence( 1, 30 );
965
966 sub task {
967 my ( $id ) = @_;
968 while ( defined ( my $num = $seq_a->next ) ) {
969 print "$id: $num\n";
970 }
971 }
972
973 MCE::Hobo->new( \&task, $_ ) for 1 .. 2;
974 MCE::Hobo->waitall;
975
976 threads->exit(0) if $INC{"threads.pm"};
977
979 The demonstrations requires MCE 1.804 to run. Otherwise, the MCE
980 "posix_exit" option must be specified and set to 1. This applies to
981 UNIX only and set automatically in 1.804 when "(F)CGI.pm" is present.
982
983 #!/usr/bin/perl
984
985 # http://127.0.0.1/cgi-bin/test_mce1.fcgi
986 # http://127.0.0.1/cgi-bin/test_mce1.fcgi?size=8
987
988 use strict;
989 use warnings;
990
991 use MCE::Map max_workers => 3;
992
993 use CGI::Fast;
994 use FCGI::ProcManager;
995
996 my $count = 0;
997
998 my $proc_manager = FCGI::ProcManager->new({ n_processes => 4 });
999 $proc_manager->pm_manage();
1000
1001 while ( my $query = CGI::Fast->new() ) {
1002 $proc_manager->pm_pre_dispatch();
1003
1004 print "Content-type: text/html\r\n\r\n";
1005 print "$$: ", ++$count, "<br>\n";
1006 print "<hr>\n";
1007
1008 print "$_ = $ENV{$_}<br>\n" foreach sort keys %ENV;
1009 print "<hr>\n";
1010
1011 my %params;
1012
1013 foreach ( sort $query->param() ) {
1014 $params{$_} = $query->param($_);
1015 print $_, " = ", $params{$_}, "<br>\n";
1016 }
1017
1018 print "<hr>\n";
1019
1020 my @ret = mce_map { "$$: ".( $_ * 2 ) } 1 .. $params{'size'} || 8;
1021
1022 print join("<br>\n", @ret), "<br>\n";
1023
1024 $proc_manager->pm_post_dispatch();
1025 }
1026
1027 Initializing MCE options before calling "pm_manage" is not recommended.
1028 The following is one way to do it and does the same thing.
1029
1030 #!/usr/bin/perl
1031
1032 # http://127.0.0.1/cgi-bin/test_mce2.fcgi
1033 # http://127.0.0.1/cgi-bin/test_mce2.fcgi?size=8
1034
1035 use strict;
1036 use warnings;
1037
1038 use MCE::Map;
1039
1040 use CGI::Fast;
1041 use FCGI::ProcManager;
1042
1043 my ($first_time, $count) = (1, 0);
1044
1045 my $proc_manager = FCGI::ProcManager->new({ n_processes => 4 });
1046 $proc_manager->pm_manage();
1047
1048 while ( my $query = CGI::Fast->new() ) {
1049 $proc_manager->pm_pre_dispatch();
1050
1051 print "Content-type: text/html\r\n\r\n";
1052 print "$$: ", ++$count, "<br>\n";
1053 print "<hr>\n";
1054
1055 print "$_ = $ENV{$_}<br>\n" foreach sort keys %ENV;
1056 print "<hr>\n";
1057
1058 my %params;
1059
1060 foreach ( sort $query->param() ) {
1061 $params{$_} = $query->param($_);
1062 print $_, " = ", $params{$_}, "<br>\n";
1063 }
1064
1065 print "<hr>\n";
1066
1067 if ( $first_time ) {
1068 MCE::Map::init( max_workers => 3 );
1069 }
1070
1071 my @ret = mce_map { "$$: ".( $_ * 2 ) } 1 .. $params{'size'} || 8;
1072
1073 print join("<br>\n", @ret), "<br>\n";
1074
1075 $proc_manager->pm_post_dispatch();
1076 }
1077
1078 Sharing data is possible via "MCE::Shared" between "FCGI" and "MCE"
1079 workers. The following is a demonstration utilizing a shared counter
1080 variable which increments by one regardless of the "FCGI" worker
1081 serving the request.
1082
1083 #!/usr/bin/perl
1084
1085 # http://127.0.0.1/cgi-bin/test_mce3.fcgi
1086 # http://127.0.0.1/cgi-bin/test_mce3.fcgi?size=8
1087
1088 use strict;
1089 use warnings;
1090
1091 use MCE::Map;
1092 use MCE::Shared;
1093
1094 use CGI::Fast;
1095 use FCGI::ProcManager;
1096
1097 # Shared variables must be defined before FCGI::ProcManager.
1098 my $count = MCE::Shared->scalar( 0 );
1099 my $first_time = 1;
1100
1101 my $proc_manager = FCGI::ProcManager->new({ n_processes => 4 });
1102 $proc_manager->pm_manage();
1103
1104 # Optional, the following statement must come after $pm->pm_manage.
1105 MCE::Shared->init(); # enables shared parallel-IPC capabilities
1106
1107 while ( my $query = CGI::Fast->new() ) {
1108 $proc_manager->pm_pre_dispatch();
1109
1110 print "Content-type: text/html\r\n\r\n";
1111 print "$$: ", $count->incr(), "<br>\n";
1112 print "<hr>\n";
1113
1114 print "$_ = $ENV{$_}<br>\n" foreach sort keys %ENV;
1115 print "<hr>\n";
1116
1117 my %params;
1118
1119 foreach ( sort $query->param() ) {
1120 $params{$_} = $query->param($_);
1121 print $_, " = ", $params{$_}, "<br>\n";
1122 }
1123
1124 print "<hr>\n";
1125
1126 if ( $first_time ) {
1127 MCE::Map->init( max_workers => 3 );
1128 $first_time = 0;
1129 }
1130
1131 my @ret = mce_map { "$$: ".( $_ * 2 ) } 1 .. $params{'size'} || 8;
1132
1133 print join("<br>\n", @ret), "<br>\n";
1134
1135 $proc_manager->pm_post_dispatch();
1136 }
1137
1138 Resetting the environment is helpful during development. The shared-
1139 manager process stops immediately upon receiving the "TERM" signal.
1140
1141 killall -TERM perl-fcgi perl-fcgi-pm ; service httpd restart
1142
1144 The demonstrations requires MCE 1.805 to run. Otherwise, the MCE
1145 "posix_exit" option must be specified and set to 1. This applies to
1146 UNIX only and set automatically in 1.805 when "Tk.pm" is present.
1147
1148 #!/usr/bin/perl
1149
1150 use strict;
1151 use warnings;
1152
1153 use MCE;
1154 use Tk;
1155
1156 my $mw = MainWindow->new( -title => 'MCE/Tk Test' );
1157
1158 $mw->geometry( '300x300' );
1159 $mw->Button( -text => "Test MCE", -command => \&test_mce )->pack();
1160
1161 my $frame = $mw->Frame->pack( -fill => 'x' );
1162
1163 my $mce = MCE->new(
1164 max_workers => 4,
1165 user_func => sub {
1166 my @args = @{ MCE->user_args() };
1167 print MCE->pid(), ": $_\n";
1168 },
1169 )->spawn;
1170
1171 MainLoop;
1172
1173 # Do not call $mce->shutdown on Windows ($^O eq 'MSWin32').
1174 # Workers terminate with the application.
1175 #
1176 # $mce->shutdown();
1177
1178 print "Exiting...\n";
1179
1180 sub test_mce {
1181 $mce->process({
1182 user_args => [ 'arg1', 'arg2', 'argN' ],
1183 input_data => [ 1 .. 10 ],
1184 chunk_size => 1,
1185 });
1186 }
1187
1188 The following demonstration does the same thing via MCE::Flow.
1189
1190 #!/usr/bin/perl
1191
1192 use strict;
1193 use warnings;
1194
1195 use MCE::Flow max_workers => 4;
1196 use Tk;
1197
1198 my $mw = MainWindow->new( -title => 'MCE/Tk Test' );
1199
1200 $mw->geometry( '300x300' );
1201 $mw->Button( -text => "Test MCE", -command => \&test_mce )->pack();
1202
1203 my $frame = $mw->Frame->pack( -fill => 'x' );
1204
1205 sub task {
1206 my @args = @{ MCE->user_args() };
1207 print MCE->pid(), ": $_\n";
1208 }
1209
1210 MainLoop;
1211
1212 print "Exiting...\n";
1213
1214 sub test_mce {
1215 MCE::Flow->init(
1216 user_args => [ 'arg1', 'arg2', 'argN' ],
1217 chunk_size => 1
1218 );
1219 MCE::Flow->run( \&task, [ 1 .. 10 ] );
1220 }
1221
1222 MCE::Hobo 1.804 or later is another possibility if running on a UNIX
1223 platform.
1224
1225 #!/usr/bin/perl
1226
1227 use strict;
1228 use warnings;
1229
1230 use MCE::Hobo;
1231 use Tk;
1232
1233 my $mw = MainWindow->new( -title => 'MCE/Tk Test' );
1234
1235 $mw->geometry( '300x300' );
1236 $mw->Button( -text => "Test MCE", -command => \&test_mce )->pack();
1237
1238 my $frame = $mw->Frame->pack( -fill => 'x' );
1239
1240 sub task {
1241 my @args = @_;
1242 print MCE::Hobo->pid(), ": $_\n";
1243 }
1244
1245 MainLoop;
1246
1247 print "Exiting...\n";
1248
1249 sub test_mce {
1250 MCE::Hobo->create(\&task, 'arg1', 'arg2', 'argN') for ( 1 .. 4 );
1251 MCE::Hobo->waitall();
1252 }
1253
1255 MCE, MCE::Core
1256
1258 Mario E. Roy, <marioeroy AT gmail DOT com>
1259
1260
1261
1262perl v5.28.0 2018-08-25 MCE::Examples(3)