1MCE::Child(3) User Contributed Perl Documentation MCE::Child(3)
2
3
4
6 MCE::Child - A threads-like parallelization module compatible with Perl
7 5.8
8
10 This document describes MCE::Child version 1.878
11
13 use MCE::Child;
14
15 MCE::Child->init(
16 max_workers => 'auto', # default undef, unlimited
17
18 # Specify a percentage. MCE::Child 1.876+.
19 max_workers => '25%', # 4 on HW with 16 lcores
20 max_workers => '50%', # 8 on HW with 16 lcores
21
22 child_timeout => 20, # default undef, no timeout
23 posix_exit => 1, # default undef, CORE::exit
24 void_context => 1, # default undef
25
26 on_start => sub {
27 my ( $pid, $ident ) = @_;
28 ...
29 },
30 on_finish => sub {
31 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
32 ...
33 }
34 );
35
36 MCE::Child->create( sub { print "Hello from child\n" } )->join();
37
38 sub parallel {
39 my ($arg1) = @_;
40 print "Hello again, $arg1\n" if defined($arg1);
41 print "Hello again, $_\n"; # same thing
42 }
43
44 MCE::Child->create( \¶llel, $_ ) for 1 .. 3;
45
46 my @procs = MCE::Child->list();
47 my @pids = MCE::Child->list_pids();
48 my @running = MCE::Child->list_running();
49 my @joinable = MCE::Child->list_joinable();
50 my @count = MCE::Child->pending();
51
52 # Joining is orderly, e.g. child1 is joined first, child2, child3.
53 $_->join() for @procs; # (or)
54 $_->join() for @joinable;
55
56 # Joining occurs immediately as child processes complete execution.
57 1 while MCE::Child->wait_one();
58
59 my $child = mce_child { foreach (@files) { ... } };
60
61 $child->join();
62
63 if ( my $err = $child->error() ) {
64 warn "Child error: $err\n";
65 }
66
67 # Get a child's object
68 $child = MCE::Child->self();
69
70 # Get a child's ID
71 $pid = MCE::Child->pid(); # $$
72 $pid = $child->pid();
73 $pid = MCE::Child->tid(); # tid is an alias for pid
74 $pid = $child->tid();
75
76 # Test child objects
77 if ( $child1 == $child2 ) {
78 ...
79 }
80
81 # Give other workers a chance to run
82 MCE::Child->yield();
83 MCE::Child->yield(0.05);
84
85 # Return context, wantarray aware
86 my ($value1, $value2) = $child->join();
87 my $value = $child->join();
88
89 # Check child's state
90 if ( $child->is_running() ) {
91 sleep 1;
92 }
93 if ( $child->is_joinable() ) {
94 $child->join();
95 }
96
97 # Send a signal to a child
98 $child->kill('SIGUSR1');
99
100 # Exit a child
101 MCE::Child->exit(0);
102 MCE::Child->exit(0, @ret);
103
105 MCE::Child is a fork of MCE::Hobo for compatibility with Perl 5.8.
106
107 A child is a migratory worker inside the machine that carries the
108 asynchronous gene. Child processes are equipped with "threads"-like
109 capability for running code asynchronously. Unlike threads, each child
110 is a unique process to the underlying OS. The IPC is handled via
111 "MCE::Channel", which runs on all the major platforms including Cygwin
112 and Strawberry Perl.
113
114 "MCE::Child" may be used as a standalone or together with "MCE"
115 including running alongside "threads".
116
117 use MCE::Child;
118 use MCE::Shared;
119
120 # synopsis: head -20 file.txt | perl script.pl
121
122 my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
123 my $ofh = MCE::Shared->handle( ">", \*STDOUT );
124 my $ary = MCE::Shared->array();
125
126 sub parallel_task {
127 my ( $id ) = @_;
128 while ( <$ifh> ) {
129 printf {$ofh} "[ %4d ] %s", $., $_;
130 # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
131 $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
132 }
133 }
134
135 my $child1 = MCE::Child->new( "parallel_task", 1 );
136 my $child2 = MCE::Child->new( \¶llel_task, 2 );
137 my $child3 = MCE::Child->new( sub { parallel_task(3) } );
138
139 $_->join for MCE::Child->list(); # ditto: MCE::Child->wait_all();
140
141 # search array (total one round-trip via IPC)
142 my @vals = $ary->vals( "val =~ / ID 2 /" );
143
144 print {*STDERR} join("", @vals);
145
147 $child = MCE::Child->create( FUNCTION, ARGS )
148 $child = MCE::Child->new( FUNCTION, ARGS )
149 This will create a new child process that will begin execution with
150 function as the entry point, and optionally ARGS for list of
151 parameters. It will return the corresponding MCE::Child object, or
152 undef if child creation failed.
153
154 FUNCTION may either be the name of a function, an anonymous
155 subroutine, or a code ref.
156
157 my $child = MCE::Child->create( "func_name", ... );
158 # or
159 my $child = MCE::Child->create( sub { ... }, ... );
160 # or
161 my $child = MCE::Child->create( \&func, ... );
162
163 $child = MCE::Child->create( { options }, FUNCTION, ARGS )
164 $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
165 Options, excluding "ident", may be specified globally via the "init"
166 function. Otherwise, "ident", "child_timeout", "posix_exit", and
167 "void_context" may be set uniquely.
168
169 The "ident" option is used by callback functions "on_start" and
170 "on_finish" for identifying the started and finished child process
171 respectively.
172
173 my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
174 ...
175 } );
176
177 $child1->join;
178
179 my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
180 sleep 1 for ( 1 .. 9 );
181 } );
182
183 $child2->join;
184
185 if ( $child2->error() eq "Child timed out\n" ) {
186 ...
187 }
188
189 The "new()" method is an alias for "create()".
190
191 mce_child { BLOCK } ARGS;
192 mce_child { BLOCK };
193 "mce_child" runs the block asynchronously similarly to
194 "MCE::Child->create()". It returns the child object, or undef if
195 child creation failed.
196
197 my $child = mce_child { foreach (@files) { ... } };
198
199 $child->join();
200
201 if ( my $err = $child->error() ) {
202 warn("Child error: $err\n");
203 }
204
205 $child->join()
206 This will wait for the corresponding child process to complete its
207 execution. In non-voided context, "join()" will return the value(s)
208 of the entry point function.
209
210 The context (void, scalar or list) for the return value(s) for
211 "join" is determined at the time of joining and mostly "wantarray"
212 aware.
213
214 my $child1 = MCE::Child->create( sub {
215 my @res = qw(foo bar baz);
216 return (@res);
217 });
218
219 my @res1 = $child1->join(); # ( foo, bar, baz )
220 my $res1 = $child1->join(); # baz
221
222 my $child2 = MCE::Child->create( sub {
223 return 'foo';
224 });
225
226 my @res2 = $child2->join(); # ( foo )
227 my $res2 = $child2->join(); # foo
228
229 $child1->equal( $child2 )
230 Tests if two child objects are the same child or not. Child
231 comparison is based on process IDs. This is overloaded to the more
232 natural forms.
233
234 if ( $child1 == $child2 ) {
235 print("Child objects are the same\n");
236 }
237 # or
238 if ( $child1 != $child2 ) {
239 print("Child objects differ\n");
240 }
241
242 $child->error()
243 Child processes are executed in an "eval" context. This method will
244 return "undef" if the child terminates normally. Otherwise, it
245 returns the value of $@ associated with the child's execution status
246 in its "eval" context.
247
248 $child->exit()
249 This sends 'SIGQUIT' to the child process, notifying the child to
250 exit. It returns the child object to allow for method chaining. It
251 is important to join later if not immediately to not leave a zombie
252 or defunct process.
253
254 $child->exit()->join();
255 ...
256
257 $child->join(); # later
258
259 MCE::Child->exit( 0 )
260 MCE::Child->exit( 0, @ret )
261 A child can exit at any time by calling "MCE::Child->exit()".
262 Otherwise, the behavior is the same as "exit(status)" when called
263 from the main process. The child process may optionally return data,
264 to be sent via IPC.
265
266 MCE::Child->finish()
267 This class method is called automatically by "END", but may be
268 called explicitly. An error is emitted via croak if there are active
269 child processes not yet joined.
270
271 MCE::Child->create( 'task1', $_ ) for 1 .. 4;
272 $_->join for MCE::Child->list();
273
274 MCE::Child->create( 'task2', $_ ) for 1 .. 4;
275 $_->join for MCE::Child->list();
276
277 MCE::Child->create( 'task3', $_ ) for 1 .. 4;
278 $_->join for MCE::Child->list();
279
280 MCE::Child->finish();
281
282 MCE::Child->init( options )
283 The init function accepts a list of MCE::Child options.
284
285 MCE::Child->init(
286 max_workers => 'auto', # default undef, unlimited
287
288 # Specify a percentage. MCE::Child 1.876+.
289 max_workers => '25%', # 4 on HW with 16 lcores
290 max_workers => '50%', # 8 on HW with 16 lcores
291
292 child_timeout => 20, # default undef, no timeout
293 posix_exit => 1, # default undef, CORE::exit
294 void_context => 1, # default undef
295
296 on_start => sub {
297 my ( $pid, $ident ) = @_;
298 ...
299 },
300 on_finish => sub {
301 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
302 ...
303 }
304 );
305
306 # Identification given as an option or the 1st argument.
307
308 for my $key ( 'aa' .. 'zz' ) {
309 MCE::Child->create( { ident => $key }, sub { ... } );
310 MCE::Child->create( $key, sub { ... } );
311 }
312
313 MCE::Child->wait_all;
314
315 Set "max_workers" if you want to limit the number of workers by
316 waiting automatically for an available slot. Specify a percentage or
317 "auto" to obtain the number of logical cores via
318 "MCE::Util::get_ncpu()".
319
320 Set "child_timeout", in number of seconds, if you want the child
321 process to terminate after some time. The default is 0 for no
322 timeout.
323
324 Set "posix_exit" to avoid all END and destructor processing.
325 Constructing MCE::Child inside a thread implies 1 or if present CGI,
326 FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
327 Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
328
329 Set "void_context" to create the child process in void context for
330 the return value. Otherwise, the return context is wantarray-aware
331 for "join()" and "result()" and determined when retrieving the data.
332
333 The callback options "on_start" and "on_finish" are called in the
334 parent process after starting the worker and later when terminated.
335 The arguments for the subroutines were inspired by
336 Parallel::ForkManager.
337
338 The parameters for "on_start" are the following:
339
340 - pid of the child process
341 - identification (ident option or 1st arg to create)
342
343 The parameters for "on_finish" are the following:
344
345 - pid of the child process
346 - program exit code
347 - identification (ident option or 1st arg to create)
348 - exit signal id
349 - error message from eval inside MCE::Child
350 - returned data
351
352 $child->is_running()
353 Returns true if a child is still running.
354
355 $child->is_joinable()
356 Returns true if the child has finished running and not yet joined.
357
358 $child->kill( 'SIG...' )
359 Sends the specified signal to the child. Returns the child object to
360 allow for method chaining. As with "exit", it is important to join
361 eventually if not immediately to not leave a zombie or defunct
362 process.
363
364 $child->kill('SIG...')->join();
365
366 The following is a parallel demonstration comparing "MCE::Shared"
367 against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
368 after all workers have been notified to quit.
369
370 use Time::HiRes qw(time);
371
372 use Redis;
373 use Redis::Fast;
374
375 use MCE::Child;
376 use MCE::Shared;
377
378 my $redis = Redis->new();
379 my $rfast = Redis::Fast->new();
380 my $array = MCE::Shared->array();
381
382 sub parallel_redis {
383 my ($_redis) = @_;
384 my ($count, $quit, $len) = (0, 0);
385
386 # instead, use a flag to exit loop
387 $SIG{'QUIT'} = sub { $quit = 1 };
388
389 while () {
390 $len = $_redis->rpush('list', $count++);
391 last if $quit;
392 }
393
394 $count;
395 }
396
397 sub parallel_array {
398 my ($count, $quit, $len) = (0, 0);
399
400 # do not exit from inside handler
401 $SIG{'QUIT'} = sub { $quit = 1 };
402
403 while () {
404 $len = $array->push($count++);
405 last if $quit;
406 }
407
408 $count;
409 }
410
411 sub benchmark_this {
412 my ($desc, $num_procs, $timeout, $code, @args) = @_;
413 my ($start, $total) = (time(), 0);
414
415 MCE::Child->new($code, @args) for 1..$num_procs;
416 sleep $timeout;
417
418 # joining is not immediate; ok
419 $_->kill('QUIT') for MCE::Child->list();
420
421 # joining later; ok
422 $total += $_->join() for MCE::Child->list();
423
424 printf "$desc <> duration: %0.03f secs, count: $total\n",
425 time() - $start;
426
427 sleep 0.2;
428 }
429
430 benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
431 benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
432 benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
433
434 MCE::Child->list()
435 Returns a list of all child objects not yet joined.
436
437 @procs = MCE::Child->list();
438
439 MCE::Child->list_pids()
440 Returns a list of all child pids not yet joined (available since
441 1.849).
442
443 @pids = MCE::Child->list_pids();
444
445 $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
446 # Signal workers all at once
447 CORE::kill('KILL', MCE::Child->list_pids());
448 exec('reset');
449 };
450
451 MCE::Child->list_running()
452 Returns a list of all child objects that are still running.
453
454 @procs = MCE::Child->list_running();
455
456 MCE::Child->list_joinable()
457 Returns a list of all child objects that have completed running.
458 Thus, ready to be joined without blocking.
459
460 @procs = MCE::Child->list_joinable();
461
462 MCE::Child->max_workers([ N ])
463 Getter and setter for max_workers. Specify a number or 'auto' to
464 acquire the total number of cores via MCE::Util::get_ncpu. Specify a
465 false value to set back to no limit.
466
467 MCE::Child->pending()
468 Returns a count of all child objects not yet joined.
469
470 $count = MCE::Child->pending();
471
472 $child->result()
473 Returns the result obtained by "join", "wait_one", or "wait_all". If
474 the process has not yet exited, waits for the corresponding child to
475 complete its execution.
476
477 use MCE::Child;
478 use Time::HiRes qw(sleep);
479
480 sub task {
481 my ($id) = @_;
482 sleep $id * 0.333;
483 return $id;
484 }
485
486 MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
487
488 # 1 while MCE::Child->wait_one();
489
490 while ( my $child = MCE::Child->wait_one() ) {
491 my $err = $child->error() || 'no error';
492 my $res = $child->result();
493 my $pid = $child->pid();
494
495 print "[$pid] $err : $res\n";
496 }
497
498 Like "join" described above, the context (void, scalar or list) for
499 the return value(s) is determined at the time "result" is called and
500 mostly "wantarray" aware.
501
502 my $child1 = MCE::Child->create( sub {
503 my @res = qw(foo bar baz);
504 return (@res);
505 });
506
507 my @res1 = $child1->result(); # ( foo, bar, baz )
508 my $res1 = $child1->result(); # baz
509
510 my $child2 = MCE::Child->create( sub {
511 return 'foo';
512 });
513
514 my @res2 = $child2->result(); # ( foo )
515 my $res2 = $child2->result(); # foo
516
517 MCE::Child->self()
518 Class method that allows a child to obtain it's own MCE::Child
519 object.
520
521 $child->pid()
522 $child->tid()
523 Returns the ID of the child.
524
525 pid: $$ process id
526 tid: $$ alias for pid
527
528 MCE::Child->pid()
529 MCE::Child->tid()
530 Class methods that allows a child to obtain its own ID.
531
532 pid: $$ process id
533 tid: $$ alias for pid
534
535 MCE::Child->wait_one()
536 MCE::Child->waitone()
537 MCE::Child->wait_all()
538 MCE::Child->waitall()
539 Meaningful for the manager process only, waits for one or all child
540 processes to complete execution. Afterwards, returns the
541 corresponding child objects. If a child doesn't exist, returns the
542 "undef" value or an empty list for "wait_one" and "wait_all"
543 respectively.
544
545 The "waitone" and "waitall" methods are aliases for compatibility
546 with "MCE::Hobo".
547
548 use MCE::Child;
549 use Time::HiRes qw(sleep);
550
551 sub task {
552 my $id = shift;
553 sleep $id * 0.333;
554 return $id;
555 }
556
557 MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
558
559 # join, traditional use case
560 $_->join() for MCE::Child->list();
561
562 # wait_one, simplistic use case
563 1 while MCE::Child->wait_one();
564
565 # wait_one
566 while ( my $child = MCE::Child->wait_one() ) {
567 my $err = $child->error() || 'no error';
568 my $res = $child->result();
569 my $pid = $child->pid();
570
571 print "[$pid] $err : $res\n";
572 }
573
574 # wait_all
575 my @procs = MCE::Child->wait_all();
576
577 for ( @procs ) {
578 my $err = $_->error() || 'no error';
579 my $res = $_->result();
580 my $pid = $_->pid();
581
582 print "[$pid] $err : $res\n";
583 }
584
585 MCE::Child->yield( [ floating_seconds ] )
586 Give other workers a chance to run, optionally for given time. Yield
587 behaves similarly to MCE's interval option. It throttles workers
588 from running too fast. A demonstration is provided in the next
589 section for fetching URLs in parallel.
590
591 The default "floating_seconds" is 0.008 and 0.015 on UNIX and
592 Windows, respectively. Pass 0 if simply wanting to give other
593 workers a chance to run.
594
595 # total run time: 1.00 second
596
597 MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
598 MCE::Child->wait_all();
599
601 Threads-like detach capability was added starting with the 1.867
602 release.
603
604 A threads example is shown first followed by the MCE::Child example.
605 All one needs to do is set the CHLD signal handler to IGNORE.
606 Unfortunately, this works on UNIX platforms only. The child process
607 restores the CHLD handler to default, so is able to deeply spin workers
608 and reap if desired.
609
610 use threads;
611
612 for ( 1 .. 8 ) {
613 async {
614 # do something
615 }->detach;
616 }
617
618 use MCE::Child;
619
620 # Have the OS reap workers automatically when exiting.
621 # The on_finish option is ignored if specified (no-op).
622 # Ensure not inside a thread on UNIX platforms.
623
624 $SIG{CHLD} = 'IGNORE';
625
626 for ( 1 .. 8 ) {
627 mce_child {
628 # do something
629 };
630 }
631
632 # Optionally, wait for any remaining workers before leaving.
633 # This is necessary if workers are consuming shared objects,
634 # constructed via MCE::Shared.
635
636 MCE::Child->wait_all;
637
638 The following is another way and works on Windows. Here, the on_finish
639 handler works as usual.
640
641 use MCE::Child;
642
643 MCE::Child->init(
644 on_finish = sub {
645 ...
646 },
647 );
648
649 for ( 1 .. 8 ) {
650 $_->join for MCE::Child->list_joinable;
651 mce_child {
652 # do something
653 };
654 }
655
656 MCE::Child->wait_all;
657
659 MCE::Child behaves similarly to threads for the most part. It also
660 provides Parallel::ForkManager-like capabilities. The
661 "Parallel::ForkManager" example is shown first followed by a version
662 using "MCE::Child".
663
664 Parallel::ForkManager
665 use strict;
666 use warnings;
667
668 use Parallel::ForkManager;
669 use Time::HiRes 'time';
670
671 my $start = time;
672
673 my $pm = Parallel::ForkManager->new(10);
674 $pm->set_waitpid_blocking_sleep(0);
675
676 $pm->run_on_finish( sub {
677 my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
678 print "child $pid completed: $ident => ", $resp->[0], "\n";
679 });
680
681 DATA_LOOP:
682 foreach my $data ( 1..2000 ) {
683 # forks and returns the pid for the child
684 my $pid = $pm->start($data) and next DATA_LOOP;
685 my $ret = [ $data * 2 ];
686
687 $pm->finish(0, $ret);
688 }
689
690 $pm->wait_all_children;
691
692 printf STDERR "duration: %0.03f seconds\n", time - $start;
693
694 MCE::Child
695 use strict;
696 use warnings;
697
698 use MCE::Child 1.843;
699 use Time::HiRes 'time';
700
701 my $start = time;
702
703 MCE::Child->init(
704 max_workers => 10,
705 on_finish => sub {
706 my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
707 print "child $pid completed: $ident => ", $resp->[0], "\n";
708 }
709 );
710
711 foreach my $data ( 1..2000 ) {
712 MCE::Child->create( $data, sub {
713 [ $data * 2 ];
714 });
715 }
716
717 MCE::Child->wait_all;
718
719 printf STDERR "duration: %0.03f seconds\n", time - $start;
720
721 Time to spin 2,000 workers and obtain results (in seconds).
722 Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
723 Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
724 with Moo loaded at the top of the script.
725
726 MCE::Hobo uses MCE::Shared to retrieve data during reaping.
727 MCE::Child uses MCE::Channel, no shared-manager.
728
729 Version Cygwin Windows Linux macOS FreeBSD
730
731 MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
732 MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
733 P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
734
735 MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
736 MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
737 P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
738
739 Set posix_exit to avoid all END and destructor processing.
740 This is helpful for reducing overhead when workers exit. Ditto if
741 using a Perl module not parallel safe. The option is ignored on
742 Windows "$^O eq 'MSWin32'".
743
744 MCE::Child->init( posix_exit => 1, ... );
745 MCE::Hobo->init( posix_exit => 1, ... );
746
747 Version Cygwin Windows Linux macOS FreeBSD
748
749 MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
750 MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
751
753 This demonstration constructs two queues, two handles, starts the
754 shared-manager process if needed, and spawns four workers. For this
755 demonstration, am chunking 64 URLs per job. In reality, one may run
756 with 200 workers and chunk 300 URLs on a 24-way box.
757
758 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
759 # perl demo.pl -- all output
760 # perl demo.pl >/dev/null -- mngr/child output
761 # perl demo.pl 2>/dev/null -- show results only
762 #
763 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
764
765 use strict;
766 use warnings;
767
768 use AnyEvent;
769 use AnyEvent::HTTP;
770 use Time::HiRes qw( time );
771
772 use MCE::Child;
773 use MCE::Shared;
774
775 # Construct two queues, input and return.
776
777 my $que = MCE::Shared->queue();
778 my $ret = MCE::Shared->queue();
779
780 # Construct shared handles for serializing output from many workers
781 # writing simultaneously. This prevents garbled output.
782
783 mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
784 mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
785
786 # Spawn workers early for minimum memory consumption.
787
788 MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
789
790 # Obtain or generate input data for workers to process.
791
792 my ( $count, @urls ) = ( 0 );
793
794 push @urls, map { "http://127.0.0.$_/" } 1..254;
795 push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
796
797 while ( @urls ) {
798 my @chunk = splice(@urls, 0, 64);
799 $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
800 }
801
802 # So that workers leave the loop after consuming the queue.
803
804 $que->end();
805
806 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
807 # Loop for the manager process. The manager may do other work if
808 # need be and periodically check $ret->pending() not shown here.
809 #
810 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
811
812 my $start = time;
813
814 printf {$ERR} "Mngr - entering loop\n";
815
816 while ( $count ) {
817 my ( $result, $failed ) = $ret->dequeue( 2 );
818
819 # Remove ID from result, so not treated as a URL item.
820
821 printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
822
823 # Display the URL and the size captured.
824
825 foreach my $url ( keys %{ $result } ) {
826 printf {$OUT} "%s: %d\n", $url, length($result->{$url})
827 if $result->{$url}; # url has content
828 }
829
830 # Display URLs could not reach.
831
832 if ( @{ $failed } ) {
833 foreach my $url ( @{ $failed } ) {
834 print {$OUT} "Failed: $url\n";
835 }
836 }
837
838 # Decrement the count.
839
840 $count--;
841 }
842
843 MCE::Child->wait_all();
844
845 printf {$ERR} "Mngr - exiting loop\n\n";
846 printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
847
848 exit;
849
850 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
851 # Child processes enqueue two items ( $result and $failed ) per each
852 # job for the manager process. Likewise, the manager process dequeues
853 # two items above. Optionally, child processes may include the ID in
854 # the result.
855 #
856 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
857
858 sub task {
859 my ( $id ) = @_;
860 printf {$ERR} "Child $id entering loop\n";
861
862 while ( my $job = $que->dequeue() ) {
863 my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
864
865 # Walk URLs, provide a hash and array refs for data.
866
867 printf {$ERR} "Child $id running job $job->{ID}\n";
868 walk( $job, $result, $failed );
869
870 # Send results to the manager process.
871
872 $ret->enqueue( $result, $failed );
873 }
874
875 printf {$ERR} "Child $id exiting loop\n";
876 }
877
878 sub walk {
879 my ( $job, $result, $failed ) = @_;
880
881 # Yielding is critical when running an event loop in parallel.
882 # Not doing so means that the app may reach contention points
883 # with the firewall and likely impose unnecessary hardship at
884 # the OS level. The idea here is not to have multiple workers
885 # initiate HTTP requests to a batch of URLs at the same time.
886 # Yielding behaves similarly like scatter to have the child
887 # process run solo for a fraction of time.
888
889 MCE::Child->yield( 0.03 );
890
891 my $cv = AnyEvent->condvar();
892
893 # Populate the hash ref for the URLs it could reach.
894 # Do not mix AnyEvent timeout with child timeout.
895 # Therefore, choose event timeout when available.
896
897 foreach my $url ( @{ $job->{INPUT} } ) {
898 $cv->begin();
899 http_get $url, timeout => 2, sub {
900 my ( $data, $headers ) = @_;
901 $result->{$url} = $data;
902 $cv->end();
903 };
904 }
905
906 $cv->recv();
907
908 # Populate the array ref for URLs it could not reach.
909
910 foreach my $url ( @{ $job->{INPUT} } ) {
911 push @{ $failed }, $url unless (exists $result->{ $url });
912 }
913
914 return;
915 }
916
917 __END__
918
919 $ perl demo.pl
920
921 Child 1 entering loop
922 Child 2 entering loop
923 Child 3 entering loop
924 Mngr - entering loop
925 Child 2 running job 2
926 Child 3 running job 3
927 Child 1 running job 1
928 Child 4 entering loop
929 Child 4 running job 4
930 Child 2 running job 5
931 Mngr - received job 2
932 Child 3 running job 6
933 Mngr - received job 3
934 Child 1 running job 7
935 Mngr - received job 1
936 Child 4 running job 8
937 Mngr - received job 4
938 http://192.168.0.1/: 3729
939 Child 2 exiting loop
940 Mngr - received job 5
941 Child 3 exiting loop
942 Mngr - received job 6
943 Child 1 exiting loop
944 Mngr - received job 7
945 Child 4 exiting loop
946 Mngr - received job 8
947 Mngr - exiting loop
948
949 Duration: 4.131 seconds
950
952 Making an executable is possible with the PAR::Packer module. On the
953 Windows platform, threads, threads::shared, and exiting via threads are
954 necessary for the binary to exit successfully.
955
956 # https://metacpan.org/pod/PAR::Packer
957 # https://metacpan.org/pod/pp
958 #
959 # pp -o demo.exe demo.pl
960 # ./demo.exe
961
962 use strict;
963 use warnings;
964
965 use if $^O eq "MSWin32", "threads";
966 use if $^O eq "MSWin32", "threads::shared";
967
968 # Include minimum dependencies for MCE::Child.
969 # Add other modules required by your application here.
970
971 use Storable ();
972 use Time::HiRes ();
973
974 # use IO::FDPass (); # optional: for condvar, handle, queue
975 # use Sereal (); # optional: for faster serialization
976
977 use MCE::Child;
978 use MCE::Shared;
979
980 # For PAR to work on the Windows platform, one must include manually
981 # any shared modules used by the application.
982
983 # use MCE::Shared::Array; # if using MCE::Shared->array
984 # use MCE::Shared::Cache; # if using MCE::Shared->cache
985 # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
986 # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
987 # use MCE::Shared::Hash; # if using MCE::Shared->hash
988 # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
989 # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
990 # use MCE::Shared::Queue; # if using MCE::Shared->queue
991 # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
992
993 # Et cetera. Only load modules needed for your application.
994
995 use MCE::Shared::Sequence; # if using MCE::Shared->sequence
996
997 my $seq = MCE::Shared->sequence( 1, 9 );
998
999 sub task {
1000 my ( $id ) = @_;
1001 while ( defined ( my $num = $seq->next() ) ) {
1002 print "$id: $num\n";
1003 sleep 1;
1004 }
1005 }
1006
1007 sub main {
1008 MCE::Child->new( \&task, $_ ) for 1 .. 3;
1009 MCE::Child->wait_all();
1010 }
1011
1012 # Main must run inside a thread on the Windows platform or workers
1013 # will fail duing exiting, causing the exe to crash. The reason is
1014 # that PAR or a dependency isn't multi-process safe.
1015
1016 ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1017
1018 threads->exit(0) if $INC{"threads.pm"};
1019
1021 MCE::Child emits an error when "is_joinable", "is_running", and "join"
1022 isn't called by the managed process, where the child was spawned. This
1023 is a limitation in MCE::Child only due to not involving a shared-
1024 manager process for IPC.
1025
1026 This use-case is not typical.
1027
1029 The inspiration for "MCE::Child" comes from wanting "threads"-like
1030 behavior for processes compatible with Perl 5.8. Both can run side-by-
1031 side including safe-use by MCE workers. Likewise, the documentation
1032 resembles "threads".
1033
1034 The inspiration for "wait_all" and "wait_one" comes from the
1035 "Parallel::WorkUnit" module.
1036
1038 • forks
1039
1040 • forks::BerkeleyDB
1041
1042 • MCE::Hobo
1043
1044 • Parallel::ForkManager
1045
1046 • Parallel::Loops
1047
1048 • Parallel::Prefork
1049
1050 • Parallel::WorkUnit
1051
1052 • Proc::Fork
1053
1054 • Thread::Tie
1055
1056 • threads
1057
1059 MCE, MCE::Channel, MCE::Shared
1060
1062 Mario E. Roy, <marioeroy AT gmail DOT com>
1063
1064
1065
1066perl v5.34.0 2022-02-20 MCE::Child(3)