1MCE::Child(3) User Contributed Perl Documentation MCE::Child(3)
2
3
4
6 MCE::Child - A threads-like parallelization module compatible with Perl
7 5.8
8
10 This document describes MCE::Child version 1.874
11
13 use MCE::Child;
14
15 MCE::Child->init(
16 max_workers => 'auto', # default undef, unlimited
17 child_timeout => 20, # default undef, no timeout
18 posix_exit => 1, # default undef, CORE::exit
19 void_context => 1, # default undef
20 on_start => sub {
21 my ( $pid, $ident ) = @_;
22 ...
23 },
24 on_finish => sub {
25 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
26 ...
27 }
28 );
29
30 MCE::Child->create( sub { print "Hello from child\n" } )->join();
31
32 sub parallel {
33 my ($arg1) = @_;
34 print "Hello again, $arg1\n" if defined($arg1);
35 print "Hello again, $_\n"; # same thing
36 }
37
38 MCE::Child->create( \¶llel, $_ ) for 1 .. 3;
39
40 my @procs = MCE::Child->list();
41 my @pids = MCE::Child->list_pids();
42 my @running = MCE::Child->list_running();
43 my @joinable = MCE::Child->list_joinable();
44 my @count = MCE::Child->pending();
45
46 # Joining is orderly, e.g. child1 is joined first, child2, child3.
47 $_->join() for @procs; # (or)
48 $_->join() for @joinable;
49
50 # Joining occurs immediately as child processes complete execution.
51 1 while MCE::Child->wait_one();
52
53 my $child = mce_child { foreach (@files) { ... } };
54
55 $child->join();
56
57 if ( my $err = $child->error() ) {
58 warn "Child error: $err\n";
59 }
60
61 # Get a child's object
62 $child = MCE::Child->self();
63
64 # Get a child's ID
65 $pid = MCE::Child->pid(); # $$
66 $pid = $child->pid();
67 $pid = MCE::Child->tid(); # tid is an alias for pid
68 $pid = $child->tid();
69
70 # Test child objects
71 if ( $child1 == $child2 ) {
72 ...
73 }
74
75 # Give other workers a chance to run
76 MCE::Child->yield();
77 MCE::Child->yield(0.05);
78
79 # Return context, wantarray aware
80 my ($value1, $value2) = $child->join();
81 my $value = $child->join();
82
83 # Check child's state
84 if ( $child->is_running() ) {
85 sleep 1;
86 }
87 if ( $child->is_joinable() ) {
88 $child->join();
89 }
90
91 # Send a signal to a child
92 $child->kill('SIGUSR1');
93
94 # Exit a child
95 MCE::Child->exit(0);
96 MCE::Child->exit(0, @ret);
97
99 MCE::Child is a fork of MCE::Hobo for compatibility with Perl 5.8.
100
101 A child is a migratory worker inside the machine that carries the
102 asynchronous gene. Child processes are equipped with "threads"-like
103 capability for running code asynchronously. Unlike threads, each child
104 is a unique process to the underlying OS. The IPC is handled via
105 "MCE::Channel", which runs on all the major platforms including Cygwin
106 and Strawberry Perl.
107
108 "MCE::Child" may be used as a standalone or together with "MCE"
109 including running alongside "threads".
110
111 use MCE::Child;
112 use MCE::Shared;
113
114 # synopsis: head -20 file.txt | perl script.pl
115
116 my $ifh = MCE::Shared->handle( "<", \*STDIN ); # shared
117 my $ofh = MCE::Shared->handle( ">", \*STDOUT );
118 my $ary = MCE::Shared->array();
119
120 sub parallel_task {
121 my ( $id ) = @_;
122 while ( <$ifh> ) {
123 printf {$ofh} "[ %4d ] %s", $., $_;
124 # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" ); # dereferencing
125 $ary->set( $. - 1, "[ ID $id ] read line $.\n" ); # faster via OO
126 }
127 }
128
129 my $child1 = MCE::Child->new( "parallel_task", 1 );
130 my $child2 = MCE::Child->new( \¶llel_task, 2 );
131 my $child3 = MCE::Child->new( sub { parallel_task(3) } );
132
133 $_->join for MCE::Child->list(); # ditto: MCE::Child->wait_all();
134
135 # search array (total one round-trip via IPC)
136 my @vals = $ary->vals( "val =~ / ID 2 /" );
137
138 print {*STDERR} join("", @vals);
139
141 $child = MCE::Child->create( FUNCTION, ARGS )
142 $child = MCE::Child->new( FUNCTION, ARGS )
143 This will create a new child process that will begin execution with
144 function as the entry point, and optionally ARGS for list of
145 parameters. It will return the corresponding MCE::Child object, or
146 undef if child creation failed.
147
148 FUNCTION may either be the name of a function, an anonymous
149 subroutine, or a code ref.
150
151 my $child = MCE::Child->create( "func_name", ... );
152 # or
153 my $child = MCE::Child->create( sub { ... }, ... );
154 # or
155 my $child = MCE::Child->create( \&func, ... );
156
157 $child = MCE::Child->create( { options }, FUNCTION, ARGS )
158 $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
159 Options, excluding "ident", may be specified globally via the "init"
160 function. Otherwise, "ident", "child_timeout", "posix_exit", and
161 "void_context" may be set uniquely.
162
163 The "ident" option is used by callback functions "on_start" and
164 "on_finish" for identifying the started and finished child process
165 respectively.
166
167 my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
168 ...
169 } );
170
171 $child1->join;
172
173 my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
174 sleep 1 for ( 1 .. 9 );
175 } );
176
177 $child2->join;
178
179 if ( $child2->error() eq "Child timed out\n" ) {
180 ...
181 }
182
183 The "new()" method is an alias for "create()".
184
185 mce_child { BLOCK } ARGS;
186 mce_child { BLOCK };
187 "mce_child" runs the block asynchronously similarly to
188 "MCE::Child->create()". It returns the child object, or undef if
189 child creation failed.
190
191 my $child = mce_child { foreach (@files) { ... } };
192
193 $child->join();
194
195 if ( my $err = $child->error() ) {
196 warn("Child error: $err\n");
197 }
198
199 $child->join()
200 This will wait for the corresponding child process to complete its
201 execution. In non-voided context, "join()" will return the value(s)
202 of the entry point function.
203
204 The context (void, scalar or list) for the return value(s) for
205 "join" is determined at the time of joining and mostly "wantarray"
206 aware.
207
208 my $child1 = MCE::Child->create( sub {
209 my @res = qw(foo bar baz);
210 return (@res);
211 });
212
213 my @res1 = $child1->join(); # ( foo, bar, baz )
214 my $res1 = $child1->join(); # baz
215
216 my $child2 = MCE::Child->create( sub {
217 return 'foo';
218 });
219
220 my @res2 = $child2->join(); # ( foo )
221 my $res2 = $child2->join(); # foo
222
223 $child1->equal( $child2 )
224 Tests if two child objects are the same child or not. Child
225 comparison is based on process IDs. This is overloaded to the more
226 natural forms.
227
228 if ( $child1 == $child2 ) {
229 print("Child objects are the same\n");
230 }
231 # or
232 if ( $child1 != $child2 ) {
233 print("Child objects differ\n");
234 }
235
236 $child->error()
237 Child processes are executed in an "eval" context. This method will
238 return "undef" if the child terminates normally. Otherwise, it
239 returns the value of $@ associated with the child's execution status
240 in its "eval" context.
241
242 $child->exit()
243 This sends 'SIGQUIT' to the child process, notifying the child to
244 exit. It returns the child object to allow for method chaining. It
245 is important to join later if not immediately to not leave a zombie
246 or defunct process.
247
248 $child->exit()->join();
249 ...
250
251 $child->join(); # later
252
253 MCE::Child->exit( 0 )
254 MCE::Child->exit( 0, @ret )
255 A child can exit at any time by calling "MCE::Child->exit()".
256 Otherwise, the behavior is the same as "exit(status)" when called
257 from the main process. The child process may optionally return data,
258 to be sent via IPC.
259
260 MCE::Child->finish()
261 This class method is called automatically by "END", but may be
262 called explicitly. An error is emitted via croak if there are active
263 child processes not yet joined.
264
265 MCE::Child->create( 'task1', $_ ) for 1 .. 4;
266 $_->join for MCE::Child->list();
267
268 MCE::Child->create( 'task2', $_ ) for 1 .. 4;
269 $_->join for MCE::Child->list();
270
271 MCE::Child->create( 'task3', $_ ) for 1 .. 4;
272 $_->join for MCE::Child->list();
273
274 MCE::Child->finish();
275
276 MCE::Child->init( options )
277 The init function accepts a list of MCE::Child options.
278
279 MCE::Child->init(
280 max_workers => 'auto', # default undef, unlimited
281 child_timeout => 20, # default undef, no timeout
282 posix_exit => 1, # default undef, CORE::exit
283 void_context => 1, # default undef
284 on_start => sub {
285 my ( $pid, $ident ) = @_;
286 ...
287 },
288 on_finish => sub {
289 my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
290 ...
291 }
292 );
293
294 # Identification given as an option or the 1st argument.
295
296 for my $key ( 'aa' .. 'zz' ) {
297 MCE::Child->create( { ident => $key }, sub { ... } );
298 MCE::Child->create( $key, sub { ... } );
299 }
300
301 MCE::Child->wait_all;
302
303 Set "max_workers" if you want to limit the number of workers by
304 waiting automatically for an available slot. Specify "auto" to
305 obtain the number of logical cores via "MCE::Util::get_ncpu()".
306
307 Set "child_timeout", in number of seconds, if you want the child
308 process to terminate after some time. The default is 0 for no
309 timeout.
310
311 Set "posix_exit" to avoid all END and destructor processing.
312 Constructing MCE::Child inside a thread implies 1 or if present CGI,
313 FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
314 Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
315
316 Set "void_context" to create the child process in void context for
317 the return value. Otherwise, the return context is wantarray-aware
318 for "join()" and "result()" and determined when retrieving the data.
319
320 The callback options "on_start" and "on_finish" are called in the
321 parent process after starting the worker and later when terminated.
322 The arguments for the subroutines were inspired by
323 Parallel::ForkManager.
324
325 The parameters for "on_start" are the following:
326
327 - pid of the child process
328 - identification (ident option or 1st arg to create)
329
330 The parameters for "on_finish" are the following:
331
332 - pid of the child process
333 - program exit code
334 - identification (ident option or 1st arg to create)
335 - exit signal id
336 - error message from eval inside MCE::Child
337 - returned data
338
339 $child->is_running()
340 Returns true if a child is still running.
341
342 $child->is_joinable()
343 Returns true if the child has finished running and not yet joined.
344
345 $child->kill( 'SIG...' )
346 Sends the specified signal to the child. Returns the child object to
347 allow for method chaining. As with "exit", it is important to join
348 eventually if not immediately to not leave a zombie or defunct
349 process.
350
351 $child->kill('SIG...')->join();
352
353 The following is a parallel demonstration comparing "MCE::Shared"
354 against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
355 after all workers have been notified to quit.
356
357 use Time::HiRes qw(time);
358
359 use Redis;
360 use Redis::Fast;
361
362 use MCE::Child;
363 use MCE::Shared;
364
365 my $redis = Redis->new();
366 my $rfast = Redis::Fast->new();
367 my $array = MCE::Shared->array();
368
369 sub parallel_redis {
370 my ($_redis) = @_;
371 my ($count, $quit, $len) = (0, 0);
372
373 # instead, use a flag to exit loop
374 $SIG{'QUIT'} = sub { $quit = 1 };
375
376 while () {
377 $len = $_redis->rpush('list', $count++);
378 last if $quit;
379 }
380
381 $count;
382 }
383
384 sub parallel_array {
385 my ($count, $quit, $len) = (0, 0);
386
387 # do not exit from inside handler
388 $SIG{'QUIT'} = sub { $quit = 1 };
389
390 while () {
391 $len = $array->push($count++);
392 last if $quit;
393 }
394
395 $count;
396 }
397
398 sub benchmark_this {
399 my ($desc, $num_procs, $timeout, $code, @args) = @_;
400 my ($start, $total) = (time(), 0);
401
402 MCE::Child->new($code, @args) for 1..$num_procs;
403 sleep $timeout;
404
405 # joining is not immediate; ok
406 $_->kill('QUIT') for MCE::Child->list();
407
408 # joining later; ok
409 $total += $_->join() for MCE::Child->list();
410
411 printf "$desc <> duration: %0.03f secs, count: $total\n",
412 time() - $start;
413
414 sleep 0.2;
415 }
416
417 benchmark_this('Redis ', 8, 5.0, \¶llel_redis, $redis);
418 benchmark_this('Redis::Fast', 8, 5.0, \¶llel_redis, $rfast);
419 benchmark_this('MCE::Shared', 8, 5.0, \¶llel_array);
420
421 MCE::Child->list()
422 Returns a list of all child objects not yet joined.
423
424 @procs = MCE::Child->list();
425
426 MCE::Child->list_pids()
427 Returns a list of all child pids not yet joined (available since
428 1.849).
429
430 @pids = MCE::Child->list_pids();
431
432 $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
433 # Signal workers all at once
434 CORE::kill('KILL', MCE::Child->list_pids());
435 exec('reset');
436 };
437
438 MCE::Child->list_running()
439 Returns a list of all child objects that are still running.
440
441 @procs = MCE::Child->list_running();
442
443 MCE::Child->list_joinable()
444 Returns a list of all child objects that have completed running.
445 Thus, ready to be joined without blocking.
446
447 @procs = MCE::Child->list_joinable();
448
449 MCE::Child->max_workers([ N ])
450 Getter and setter for max_workers. Specify a number or 'auto' to
451 acquire the total number of cores via MCE::Util::get_ncpu. Specify a
452 false value to set back to no limit.
453
454 MCE::Child->pending()
455 Returns a count of all child objects not yet joined.
456
457 $count = MCE::Child->pending();
458
459 $child->result()
460 Returns the result obtained by "join", "wait_one", or "wait_all". If
461 the process has not yet exited, waits for the corresponding child to
462 complete its execution.
463
464 use MCE::Child;
465 use Time::HiRes qw(sleep);
466
467 sub task {
468 my ($id) = @_;
469 sleep $id * 0.333;
470 return $id;
471 }
472
473 MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
474
475 # 1 while MCE::Child->wait_one();
476
477 while ( my $child = MCE::Child->wait_one() ) {
478 my $err = $child->error() || 'no error';
479 my $res = $child->result();
480 my $pid = $child->pid();
481
482 print "[$pid] $err : $res\n";
483 }
484
485 Like "join" described above, the context (void, scalar or list) for
486 the return value(s) is determined at the time "result" is called and
487 mostly "wantarray" aware.
488
489 my $child1 = MCE::Child->create( sub {
490 my @res = qw(foo bar baz);
491 return (@res);
492 });
493
494 my @res1 = $child1->result(); # ( foo, bar, baz )
495 my $res1 = $child1->result(); # baz
496
497 my $child2 = MCE::Child->create( sub {
498 return 'foo';
499 });
500
501 my @res2 = $child2->result(); # ( foo )
502 my $res2 = $child2->result(); # foo
503
504 MCE::Child->self()
505 Class method that allows a child to obtain it's own MCE::Child
506 object.
507
508 $child->pid()
509 $child->tid()
510 Returns the ID of the child.
511
512 pid: $$ process id
513 tid: $$ alias for pid
514
515 MCE::Child->pid()
516 MCE::Child->tid()
517 Class methods that allows a child to obtain its own ID.
518
519 pid: $$ process id
520 tid: $$ alias for pid
521
522 MCE::Child->wait_one()
523 MCE::Child->waitone()
524 MCE::Child->wait_all()
525 MCE::Child->waitall()
526 Meaningful for the manager process only, waits for one or all child
527 processes to complete execution. Afterwards, returns the
528 corresponding child objects. If a child doesn't exist, returns the
529 "undef" value or an empty list for "wait_one" and "wait_all"
530 respectively.
531
532 The "waitone" and "waitall" methods are aliases for compatibility
533 with "MCE::Hobo".
534
535 use MCE::Child;
536 use Time::HiRes qw(sleep);
537
538 sub task {
539 my $id = shift;
540 sleep $id * 0.333;
541 return $id;
542 }
543
544 MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
545
546 # join, traditional use case
547 $_->join() for MCE::Child->list();
548
549 # wait_one, simplistic use case
550 1 while MCE::Child->wait_one();
551
552 # wait_one
553 while ( my $child = MCE::Child->wait_one() ) {
554 my $err = $child->error() || 'no error';
555 my $res = $child->result();
556 my $pid = $child->pid();
557
558 print "[$pid] $err : $res\n";
559 }
560
561 # wait_all
562 my @procs = MCE::Child->wait_all();
563
564 for ( @procs ) {
565 my $err = $_->error() || 'no error';
566 my $res = $_->result();
567 my $pid = $_->pid();
568
569 print "[$pid] $err : $res\n";
570 }
571
572 MCE::Child->yield( [ floating_seconds ] )
573 Give other workers a chance to run, optionally for given time. Yield
574 behaves similarly to MCE's interval option. It throttles workers
575 from running too fast. A demonstration is provided in the next
576 section for fetching URLs in parallel.
577
578 The default "floating_seconds" is 0.008 and 0.015 on UNIX and
579 Windows, respectively. Pass 0 if simply wanting to give other
580 workers a chance to run.
581
582 # total run time: 1.00 second
583
584 MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
585 MCE::Child->wait_all();
586
588 Threads-like detach capability was added starting with the 1.867
589 release.
590
591 A threads example is shown first followed by the MCE::Child example.
592 All one needs to do is set the CHLD signal handler to IGNORE.
593 Unfortunately, this works on UNIX platforms only. The child process
594 restores the CHLD handler to default, so is able to deeply spin workers
595 and reap if desired.
596
597 use threads;
598
599 for ( 1 .. 8 ) {
600 async {
601 # do something
602 }->detach;
603 }
604
605 use MCE::Child;
606
607 # Have the OS reap workers automatically when exiting.
608 # The on_finish option is ignored if specified (no-op).
609 # Ensure not inside a thread on UNIX platforms.
610
611 $SIG{CHLD} = 'IGNORE';
612
613 for ( 1 .. 8 ) {
614 mce_child {
615 # do something
616 };
617 }
618
619 # Optionally, wait for any remaining workers before leaving.
620 # This is necessary if workers are consuming shared objects,
621 # constructed via MCE::Shared.
622
623 MCE::Child->wait_all;
624
625 The following is another way and works on Windows. Here, the on_finish
626 handler works as usual.
627
628 use MCE::Child;
629
630 MCE::Child->init(
631 on_finish = sub {
632 ...
633 },
634 );
635
636 for ( 1 .. 8 ) {
637 $_->join for MCE::Child->list_joinable;
638 mce_child {
639 # do something
640 };
641 }
642
643 MCE::Child->wait_all;
644
646 MCE::Child behaves similarly to threads for the most part. It also
647 provides Parallel::ForkManager-like capabilities. The
648 "Parallel::ForkManager" example is shown first followed by a version
649 using "MCE::Child".
650
651 Parallel::ForkManager
652 use strict;
653 use warnings;
654
655 use Parallel::ForkManager;
656 use Time::HiRes 'time';
657
658 my $start = time;
659
660 my $pm = Parallel::ForkManager->new(10);
661 $pm->set_waitpid_blocking_sleep(0);
662
663 $pm->run_on_finish( sub {
664 my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
665 print "child $pid completed: $ident => ", $resp->[0], "\n";
666 });
667
668 DATA_LOOP:
669 foreach my $data ( 1..2000 ) {
670 # forks and returns the pid for the child
671 my $pid = $pm->start($data) and next DATA_LOOP;
672 my $ret = [ $data * 2 ];
673
674 $pm->finish(0, $ret);
675 }
676
677 $pm->wait_all_children;
678
679 printf STDERR "duration: %0.03f seconds\n", time - $start;
680
681 MCE::Child
682 use strict;
683 use warnings;
684
685 use MCE::Child 1.843;
686 use Time::HiRes 'time';
687
688 my $start = time;
689
690 MCE::Child->init(
691 max_workers => 10,
692 on_finish => sub {
693 my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
694 print "child $pid completed: $ident => ", $resp->[0], "\n";
695 }
696 );
697
698 foreach my $data ( 1..2000 ) {
699 MCE::Child->create( $data, sub {
700 [ $data * 2 ];
701 });
702 }
703
704 MCE::Child->wait_all;
705
706 printf STDERR "duration: %0.03f seconds\n", time - $start;
707
708 Time to spin 2,000 workers and obtain results (in seconds).
709 Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
710 Boost). Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
711 with Moo loaded at the top of the script.
712
713 MCE::Hobo uses MCE::Shared to retrieve data during reaping.
714 MCE::Child uses MCE::Channel, no shared-manager.
715
716 Version Cygwin Windows Linux macOS FreeBSD
717
718 MCE::Child 1.843 19.099s 17.091s 0.965s 1.534s 1.229s
719 MCE::Hobo 1.843 20.514s 19.594s 1.246s 1.629s 1.613s
720 P::FM 1.20 19.703s 19.235s 0.875s 1.445s 1.346s
721
722 MCE::Child 1.843 20.426s 18.417s 1.116s 1.632s 1.338s Moo loaded
723 MCE::Hobo 1.843 21.809s 20.810s 1.407s 1.759s 1.722s Moo loaded
724 P::FM 2.02 21.668s 25.927s 1.882s 2.612s 2.483s Moo used
725
726 Set posix_exit to avoid all END and destructor processing.
727 This is helpful for reducing overhead when workers exit. Ditto if
728 using a Perl module not parallel safe. The option is ignored on
729 Windows "$^O eq 'MSWin32'".
730
731 MCE::Child->init( posix_exit => 1, ... );
732 MCE::Hobo->init( posix_exit => 1, ... );
733
734 Version Cygwin Windows Linux macOS FreeBSD
735
736 MCE::Child 1.843 19.815s ignored 0.824s 1.284s 1.245s Moo loaded
737 MCE::Hobo 1.843 21.029s ignored 0.953s 1.335s 1.439s Moo loaded
738
740 This demonstration constructs two queues, two handles, starts the
741 shared-manager process if needed, and spawns four workers. For this
742 demonstration, am chunking 64 URLs per job. In reality, one may run
743 with 200 workers and chunk 300 URLs on a 24-way box.
744
745 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
746 # perl demo.pl -- all output
747 # perl demo.pl >/dev/null -- mngr/child output
748 # perl demo.pl 2>/dev/null -- show results only
749 #
750 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
751
752 use strict;
753 use warnings;
754
755 use AnyEvent;
756 use AnyEvent::HTTP;
757 use Time::HiRes qw( time );
758
759 use MCE::Child;
760 use MCE::Shared;
761
762 # Construct two queues, input and return.
763
764 my $que = MCE::Shared->queue();
765 my $ret = MCE::Shared->queue();
766
767 # Construct shared handles for serializing output from many workers
768 # writing simultaneously. This prevents garbled output.
769
770 mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
771 mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
772
773 # Spawn workers early for minimum memory consumption.
774
775 MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
776
777 # Obtain or generate input data for workers to process.
778
779 my ( $count, @urls ) = ( 0 );
780
781 push @urls, map { "http://127.0.0.$_/" } 1..254;
782 push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
783
784 while ( @urls ) {
785 my @chunk = splice(@urls, 0, 64);
786 $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
787 }
788
789 # So that workers leave the loop after consuming the queue.
790
791 $que->end();
792
793 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
794 # Loop for the manager process. The manager may do other work if
795 # need be and periodically check $ret->pending() not shown here.
796 #
797 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
798
799 my $start = time;
800
801 printf {$ERR} "Mngr - entering loop\n";
802
803 while ( $count ) {
804 my ( $result, $failed ) = $ret->dequeue( 2 );
805
806 # Remove ID from result, so not treated as a URL item.
807
808 printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
809
810 # Display the URL and the size captured.
811
812 foreach my $url ( keys %{ $result } ) {
813 printf {$OUT} "%s: %d\n", $url, length($result->{$url})
814 if $result->{$url}; # url has content
815 }
816
817 # Display URLs could not reach.
818
819 if ( @{ $failed } ) {
820 foreach my $url ( @{ $failed } ) {
821 print {$OUT} "Failed: $url\n";
822 }
823 }
824
825 # Decrement the count.
826
827 $count--;
828 }
829
830 MCE::Child->wait_all();
831
832 printf {$ERR} "Mngr - exiting loop\n\n";
833 printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
834
835 exit;
836
837 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
838 # Child processes enqueue two items ( $result and $failed ) per each
839 # job for the manager process. Likewise, the manager process dequeues
840 # two items above. Optionally, child processes may include the ID in
841 # the result.
842 #
843 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
844
845 sub task {
846 my ( $id ) = @_;
847 printf {$ERR} "Child $id entering loop\n";
848
849 while ( my $job = $que->dequeue() ) {
850 my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
851
852 # Walk URLs, provide a hash and array refs for data.
853
854 printf {$ERR} "Child $id running job $job->{ID}\n";
855 walk( $job, $result, $failed );
856
857 # Send results to the manager process.
858
859 $ret->enqueue( $result, $failed );
860 }
861
862 printf {$ERR} "Child $id exiting loop\n";
863 }
864
865 sub walk {
866 my ( $job, $result, $failed ) = @_;
867
868 # Yielding is critical when running an event loop in parallel.
869 # Not doing so means that the app may reach contention points
870 # with the firewall and likely impose unnecessary hardship at
871 # the OS level. The idea here is not to have multiple workers
872 # initiate HTTP requests to a batch of URLs at the same time.
873 # Yielding behaves similarly like scatter to have the child
874 # process run solo for a fraction of time.
875
876 MCE::Child->yield( 0.03 );
877
878 my $cv = AnyEvent->condvar();
879
880 # Populate the hash ref for the URLs it could reach.
881 # Do not mix AnyEvent timeout with child timeout.
882 # Therefore, choose event timeout when available.
883
884 foreach my $url ( @{ $job->{INPUT} } ) {
885 $cv->begin();
886 http_get $url, timeout => 2, sub {
887 my ( $data, $headers ) = @_;
888 $result->{$url} = $data;
889 $cv->end();
890 };
891 }
892
893 $cv->recv();
894
895 # Populate the array ref for URLs it could not reach.
896
897 foreach my $url ( @{ $job->{INPUT} } ) {
898 push @{ $failed }, $url unless (exists $result->{ $url });
899 }
900
901 return;
902 }
903
904 __END__
905
906 $ perl demo.pl
907
908 Child 1 entering loop
909 Child 2 entering loop
910 Child 3 entering loop
911 Mngr - entering loop
912 Child 2 running job 2
913 Child 3 running job 3
914 Child 1 running job 1
915 Child 4 entering loop
916 Child 4 running job 4
917 Child 2 running job 5
918 Mngr - received job 2
919 Child 3 running job 6
920 Mngr - received job 3
921 Child 1 running job 7
922 Mngr - received job 1
923 Child 4 running job 8
924 Mngr - received job 4
925 http://192.168.0.1/: 3729
926 Child 2 exiting loop
927 Mngr - received job 5
928 Child 3 exiting loop
929 Mngr - received job 6
930 Child 1 exiting loop
931 Mngr - received job 7
932 Child 4 exiting loop
933 Mngr - received job 8
934 Mngr - exiting loop
935
936 Duration: 4.131 seconds
937
939 Making an executable is possible with the PAR::Packer module. On the
940 Windows platform, threads, threads::shared, and exiting via threads are
941 necessary for the binary to exit successfully.
942
943 # https://metacpan.org/pod/PAR::Packer
944 # https://metacpan.org/pod/pp
945 #
946 # pp -o demo.exe demo.pl
947 # ./demo.exe
948
949 use strict;
950 use warnings;
951
952 use if $^O eq "MSWin32", "threads";
953 use if $^O eq "MSWin32", "threads::shared";
954
955 # Include minimum dependencies for MCE::Child.
956 # Add other modules required by your application here.
957
958 use Storable ();
959 use Time::HiRes ();
960
961 # use IO::FDPass (); # optional: for condvar, handle, queue
962 # use Sereal (); # optional: for faster serialization
963
964 use MCE::Child;
965 use MCE::Shared;
966
967 # For PAR to work on the Windows platform, one must include manually
968 # any shared modules used by the application.
969
970 # use MCE::Shared::Array; # if using MCE::Shared->array
971 # use MCE::Shared::Cache; # if using MCE::Shared->cache
972 # use MCE::Shared::Condvar; # if using MCE::Shared->condvar
973 # use MCE::Shared::Handle; # if using MCE::Shared->handle, mce_open
974 # use MCE::Shared::Hash; # if using MCE::Shared->hash
975 # use MCE::Shared::Minidb; # if using MCE::Shared->minidb
976 # use MCE::Shared::Ordhash; # if using MCE::Shared->ordhash
977 # use MCE::Shared::Queue; # if using MCE::Shared->queue
978 # use MCE::Shared::Scalar; # if using MCE::Shared->scalar
979
980 # Et cetera. Only load modules needed for your application.
981
982 use MCE::Shared::Sequence; # if using MCE::Shared->sequence
983
984 my $seq = MCE::Shared->sequence( 1, 9 );
985
986 sub task {
987 my ( $id ) = @_;
988 while ( defined ( my $num = $seq->next() ) ) {
989 print "$id: $num\n";
990 sleep 1;
991 }
992 }
993
994 sub main {
995 MCE::Child->new( \&task, $_ ) for 1 .. 3;
996 MCE::Child->wait_all();
997 }
998
999 # Main must run inside a thread on the Windows platform or workers
1000 # will fail duing exiting, causing the exe to crash. The reason is
1001 # that PAR or a dependency isn't multi-process safe.
1002
1003 ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1004
1005 threads->exit(0) if $INC{"threads.pm"};
1006
1008 MCE::Child emits an error when "is_joinable", "is_running", and "join"
1009 isn't called by the managed process, where the child was spawned. This
1010 is a limitation in MCE::Child only due to not involving a shared-
1011 manager process for IPC.
1012
1013 This use-case is not typical.
1014
1016 The inspiration for "MCE::Child" comes from wanting "threads"-like
1017 behavior for processes compatible with Perl 5.8. Both can run side-by-
1018 side including safe-use by MCE workers. Likewise, the documentation
1019 resembles "threads".
1020
1021 The inspiration for "wait_all" and "wait_one" comes from the
1022 "Parallel::WorkUnit" module.
1023
1025 · forks
1026
1027 · forks::BerkeleyDB
1028
1029 · MCE::Hobo
1030
1031 · Parallel::ForkManager
1032
1033 · Parallel::Loops
1034
1035 · Parallel::Prefork
1036
1037 · Parallel::WorkUnit
1038
1039 · Proc::Fork
1040
1041 · Thread::Tie
1042
1043 · threads
1044
1046 MCE, MCE::Channel, MCE::Shared
1047
1049 Mario E. Roy, <marioeroy AT gmail DOT com>
1050
1051
1052
1053perl v5.32.0 2020-08-19 MCE::Child(3)