1MCE::Child(3)         User Contributed Perl Documentation        MCE::Child(3)
2
3
4

NAME

6       MCE::Child - A threads-like parallelization module compatible with Perl
7       5.8
8

VERSION

10       This document describes MCE::Child version 1.884
11

SYNOPSIS

13        use MCE::Child;
14
15        MCE::Child->init(
16            max_workers => 'auto',   # default undef, unlimited
17
18            # Specify a percentage. MCE::Child 1.876+.
19            max_workers => '25%',    # 4 on HW with 16 lcores
20            max_workers => '50%',    # 8 on HW with 16 lcores
21
22            child_timeout => 20,     # default undef, no timeout
23            posix_exit => 1,         # default undef, CORE::exit
24            void_context => 1,       # default undef
25
26            on_start => sub {
27                my ( $pid, $ident ) = @_;
28                ...
29            },
30            on_finish => sub {
31                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
32                ...
33            }
34        );
35
36        MCE::Child->create( sub { print "Hello from child\n" } )->join();
37
38        sub parallel {
39            my ($arg1) = @_;
40            print "Hello again, $arg1\n" if defined($arg1);
41            print "Hello again, $_\n"; # same thing
42        }
43
44        MCE::Child->create( \&parallel, $_ ) for 1 .. 3;
45
46        my @procs    = MCE::Child->list();
47        my @pids     = MCE::Child->list_pids();
48        my @running  = MCE::Child->list_running();
49        my @joinable = MCE::Child->list_joinable();
50        my @count    = MCE::Child->pending();
51
52        # Joining is orderly, e.g. child1 is joined first, child2, child3.
53        $_->join() for @procs;   # (or)
54        $_->join() for @joinable;
55
56        # Joining occurs immediately as child processes complete execution.
57        1 while MCE::Child->wait_one();
58
59        my $child = mce_child { foreach (@files) { ... } };
60
61        $child->join();
62
63        if ( my $err = $child->error() ) {
64            warn "Child error: $err\n";
65        }
66
67        # Get a child's object
68        $child = MCE::Child->self();
69
70        # Get a child's ID
71        $pid = MCE::Child->pid();  # $$
72        $pid = $child->pid();
73        $pid = MCE::Child->tid();  # tid is an alias for pid
74        $pid = $child->tid();
75
76        # Test child objects
77        if ( $child1 == $child2 ) {
78            ...
79        }
80
81        # Give other workers a chance to run
82        MCE::Child->yield();
83        MCE::Child->yield(0.05);
84
85        # Return context, wantarray aware
86        my ($value1, $value2) = $child->join();
87        my $value = $child->join();
88
89        # Check child's state
90        if ( $child->is_running() ) {
91            sleep 1;
92        }
93        if ( $child->is_joinable() ) {
94            $child->join();
95        }
96
97        # Send a signal to a child
98        $child->kill('SIGUSR1');
99
100        # Exit a child
101        MCE::Child->exit(0);
102        MCE::Child->exit(0, @ret);
103

DESCRIPTION

105       MCE::Child is a fork of MCE::Hobo for compatibility with Perl 5.8.
106
107       A child is a migratory worker inside the machine that carries the
108       asynchronous gene. Child processes are equipped with "threads"-like
109       capability for running code asynchronously. Unlike threads, each child
110       is a unique process to the underlying OS. The IPC is handled via
111       "MCE::Channel", which runs on all the major platforms including Cygwin
112       and Strawberry Perl.
113
114       "MCE::Child" may be used as a standalone or together with "MCE"
115       including running alongside "threads".
116
117        use MCE::Child;
118        use MCE::Shared;
119
120        # synopsis: head -20 file.txt | perl script.pl
121
122        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
123        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
124        my $ary = MCE::Shared->array();
125
126        sub parallel_task {
127            my ( $id ) = @_;
128            while ( <$ifh> ) {
129                printf {$ofh} "[ %4d ] %s", $., $_;
130              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
131                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
132            }
133        }
134
135        my $child1 = MCE::Child->new( "parallel_task", 1 );
136        my $child2 = MCE::Child->new( \&parallel_task, 2 );
137        my $child3 = MCE::Child->new( sub { parallel_task(3) } );
138
139        $_->join for MCE::Child->list();  # ditto: MCE::Child->wait_all();
140
141        # search array (total one round-trip via IPC)
142        my @vals = $ary->vals( "val =~ / ID 2 /" );
143
144        print {*STDERR} join("", @vals);
145

API DOCUMENTATION

147       $child = MCE::Child->create( FUNCTION, ARGS )
148       $child = MCE::Child->new( FUNCTION, ARGS )
149          This will create a new child process that will begin execution with
150          function as the entry point, and optionally ARGS for list of
151          parameters. It will return the corresponding MCE::Child object, or
152          undef if child creation failed.
153
154          FUNCTION may either be the name of a function, an anonymous
155          subroutine, or a code ref.
156
157           my $child = MCE::Child->create( "func_name", ... );
158               # or
159           my $child = MCE::Child->create( sub { ... }, ... );
160               # or
161           my $child = MCE::Child->create( \&func, ... );
162
163       $child = MCE::Child->create( { options }, FUNCTION, ARGS )
164       $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
165          Options, excluding "ident", may be specified globally via the "init"
166          function.  Otherwise, "ident", "child_timeout", "posix_exit", and
167          "void_context" may be set uniquely.
168
169          The "ident" option is used by callback functions "on_start" and
170          "on_finish" for identifying the started and finished child process
171          respectively.
172
173           my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
174               ...
175           } );
176
177           $child1->join;
178
179           my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
180               sleep 1 for ( 1 .. 9 );
181           } );
182
183           $child2->join;
184
185           if ( $child2->error() eq "Child timed out\n" ) {
186               ...
187           }
188
189          The new() method is an alias for create().
190
191       mce_child { BLOCK } ARGS;
192       mce_child { BLOCK };
193          "mce_child" runs the block asynchronously similarly to
194          "MCE::Child->create()".  It returns the child object, or undef if
195          child creation failed.
196
197           my $child = mce_child { foreach (@files) { ... } };
198
199           $child->join();
200
201           if ( my $err = $child->error() ) {
202               warn("Child error: $err\n");
203           }
204
205       $child->join()
206          This will wait for the corresponding child process to complete its
207          execution.  In non-voided context, join() will return the value(s)
208          of the entry point function.
209
210          The context (void, scalar or list) for the return value(s) for
211          "join" is determined at the time of joining and mostly "wantarray"
212          aware.
213
214           my $child1 = MCE::Child->create( sub {
215               my @res = qw(foo bar baz);
216               return (@res);
217           });
218
219           my @res1 = $child1->join();  # ( foo, bar, baz )
220           my $res1 = $child1->join();  #   baz
221
222           my $child2 = MCE::Child->create( sub {
223               return 'foo';
224           });
225
226           my @res2 = $child2->join();  # ( foo )
227           my $res2 = $child2->join();  #   foo
228
229       $child1->equal( $child2 )
230          Tests if two child objects are the same child or not. Child
231          comparison is based on process IDs. This is overloaded to the more
232          natural forms.
233
234           if ( $child1 == $child2 ) {
235               print("Child objects are the same\n");
236           }
237           # or
238           if ( $child1 != $child2 ) {
239               print("Child objects differ\n");
240           }
241
242       $child->error()
243          Child processes are executed in an "eval" context. This method will
244          return "undef" if the child terminates normally. Otherwise, it
245          returns the value of $@ associated with the child's execution status
246          in its "eval" context.
247
248       $child->exit()
249          This sends 'SIGQUIT' to the child process, notifying the child to
250          exit.  It returns the child object to allow for method chaining. It
251          is important to join later if not immediately to not leave a zombie
252          or defunct process.
253
254           $child->exit()->join();
255           ...
256
257           $child->join();  # later
258
259       MCE::Child->exit( 0 )
260       MCE::Child->exit( 0, @ret )
261          A child can exit at any time by calling "MCE::Child->exit()".
262          Otherwise, the behavior is the same as exit(status) when called from
263          the main process. The child process may optionally return data, to
264          be sent via IPC.
265
266       MCE::Child->finish()
267          This class method is called automatically by "END", but may be
268          called explicitly. An error is emitted via croak if there are active
269          child processes not yet joined.
270
271           MCE::Child->create( 'task1', $_ ) for 1 .. 4;
272           $_->join for MCE::Child->list();
273
274           MCE::Child->create( 'task2', $_ ) for 1 .. 4;
275           $_->join for MCE::Child->list();
276
277           MCE::Child->create( 'task3', $_ ) for 1 .. 4;
278           $_->join for MCE::Child->list();
279
280           MCE::Child->finish();
281
282       MCE::Child->init( options )
283          The init function accepts a list of MCE::Child options.
284
285           MCE::Child->init(
286               max_workers => 'auto',   # default undef, unlimited
287
288               # Specify a percentage. MCE::Child 1.876+.
289               max_workers => '25%',    # 4 on HW with 16 lcores
290               max_workers => '50%',    # 8 on HW with 16 lcores
291
292               child_timeout => 20,     # default undef, no timeout
293               posix_exit => 1,         # default undef, CORE::exit
294               void_context => 1,       # default undef
295
296               on_start => sub {
297                   my ( $pid, $ident ) = @_;
298                   ...
299               },
300               on_finish => sub {
301                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
302                   ...
303               }
304           );
305
306           # Identification given as an option or the 1st argument.
307
308           for my $key ( 'aa' .. 'zz' ) {
309               MCE::Child->create( { ident => $key }, sub { ... } );
310               MCE::Child->create( $key, sub { ... } );
311           }
312
313           MCE::Child->wait_all;
314
315          Set "max_workers" if you want to limit the number of workers by
316          waiting automatically for an available slot. Specify a percentage or
317          "auto" to obtain the number of logical cores via
318          MCE::Util::get_ncpu().
319
320          Set "child_timeout", in number of seconds, if you want the child
321          process to terminate after some time. The default is 0 for no
322          timeout.
323
324          Set "posix_exit" to avoid all END and destructor processing.
325          Constructing MCE::Child inside a thread implies 1 or if present CGI,
326          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
327          Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
328
329          Set "void_context" to create the child process in void context for
330          the return value. Otherwise, the return context is wantarray-aware
331          for join() and result() and determined when retrieving the data.
332
333          The callback options "on_start" and "on_finish" are called in the
334          parent process after starting the worker and later when terminated.
335          The arguments for the subroutines were inspired by
336          Parallel::ForkManager.
337
338          The parameters for "on_start" are the following:
339
340           - pid of the child process
341           - identification (ident option or 1st arg to create)
342
343          The parameters for "on_finish" are the following:
344
345           - pid of the child process
346           - program exit code
347           - identification (ident option or 1st arg to create)
348           - exit signal id
349           - error message from eval inside MCE::Child
350           - returned data
351
352       $child->is_running()
353          Returns true if a child is still running.
354
355       $child->is_joinable()
356          Returns true if the child has finished running and not yet joined.
357
358       $child->kill( 'SIG...' )
359          Sends the specified signal to the child. Returns the child object to
360          allow for method chaining. As with "exit", it is important to join
361          eventually if not immediately to not leave a zombie or defunct
362          process.
363
364           $child->kill('SIG...')->join();
365
366          The following is a parallel demonstration comparing "MCE::Shared"
367          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
368          after all workers have been notified to quit.
369
370           use Time::HiRes qw(time);
371
372           use Redis;
373           use Redis::Fast;
374
375           use MCE::Child;
376           use MCE::Shared;
377
378           my $redis = Redis->new();
379           my $rfast = Redis::Fast->new();
380           my $array = MCE::Shared->array();
381
382           sub parallel_redis {
383               my ($_redis) = @_;
384               my ($count, $quit, $len) = (0, 0);
385
386               # instead, use a flag to exit loop
387               $SIG{'QUIT'} = sub { $quit = 1 };
388
389               while () {
390                   $len = $_redis->rpush('list', $count++);
391                   last if $quit;
392               }
393
394               $count;
395           }
396
397           sub parallel_array {
398               my ($count, $quit, $len) = (0, 0);
399
400               # do not exit from inside handler
401               $SIG{'QUIT'} = sub { $quit = 1 };
402
403               while () {
404                   $len = $array->push($count++);
405                   last if $quit;
406               }
407
408               $count;
409           }
410
411           sub benchmark_this {
412               my ($desc, $num_procs, $timeout, $code, @args) = @_;
413               my ($start, $total) = (time(), 0);
414
415               MCE::Child->new($code, @args) for 1..$num_procs;
416               sleep $timeout;
417
418               # joining is not immediate; ok
419               $_->kill('QUIT') for MCE::Child->list();
420
421               # joining later; ok
422               $total += $_->join() for MCE::Child->list();
423
424               printf "$desc <> duration: %0.03f secs, count: $total\n",
425                   time() - $start;
426
427               sleep 0.2;
428           }
429
430           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
431           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
432           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
433
434       MCE::Child->list()
435          Returns a list of all child objects not yet joined.
436
437           @procs = MCE::Child->list();
438
439       MCE::Child->list_pids()
440          Returns a list of all child pids not yet joined (available since
441          1.849).
442
443           @pids = MCE::Child->list_pids();
444
445           $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
446               # Signal workers all at once
447               CORE::kill('KILL', MCE::Child->list_pids());
448               exec('reset');
449           };
450
451       MCE::Child->list_running()
452          Returns a list of all child objects that are still running.
453
454           @procs = MCE::Child->list_running();
455
456       MCE::Child->list_joinable()
457          Returns a list of all child objects that have completed running.
458          Thus, ready to be joined without blocking.
459
460           @procs = MCE::Child->list_joinable();
461
462       MCE::Child->max_workers([ N ])
463          Getter and setter for max_workers. Specify a number or 'auto' to
464          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
465          false value to set back to no limit.
466
467       MCE::Child->pending()
468          Returns a count of all child objects not yet joined.
469
470           $count = MCE::Child->pending();
471
472       $child->result()
473          Returns the result obtained by "join", "wait_one", or "wait_all". If
474          the process has not yet exited, waits for the corresponding child to
475          complete its execution.
476
477           use MCE::Child;
478           use Time::HiRes qw(sleep);
479
480           sub task {
481               my ($id) = @_;
482               sleep $id * 0.333;
483               return $id;
484           }
485
486           MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
487
488           # 1 while MCE::Child->wait_one();
489
490           while ( my $child = MCE::Child->wait_one() ) {
491               my $err = $child->error() || 'no error';
492               my $res = $child->result();
493               my $pid = $child->pid();
494
495               print "[$pid] $err : $res\n";
496           }
497
498          Like "join" described above, the context (void, scalar or list) for
499          the return value(s) is determined at the time "result" is called and
500          mostly "wantarray" aware.
501
502           my $child1 = MCE::Child->create( sub {
503               my @res = qw(foo bar baz);
504               return (@res);
505           });
506
507           my @res1 = $child1->result();  # ( foo, bar, baz )
508           my $res1 = $child1->result();  #   baz
509
510           my $child2 = MCE::Child->create( sub {
511               return 'foo';
512           });
513
514           my @res2 = $child2->result();  # ( foo )
515           my $res2 = $child2->result();  #   foo
516
517       MCE::Child->self()
518          Class method that allows a child to obtain it's own MCE::Child
519          object.
520
521       $child->pid()
522       $child->tid()
523          Returns the ID of the child.
524
525           pid: $$  process id
526           tid: $$  alias for pid
527
528       MCE::Child->pid()
529       MCE::Child->tid()
530          Class methods that allows a child to obtain its own ID.
531
532           pid: $$  process id
533           tid: $$  alias for pid
534
535       MCE::Child->wait_one()
536       MCE::Child->waitone()
537       MCE::Child->wait_all()
538       MCE::Child->waitall()
539          Meaningful for the manager process only, waits for one or all child
540          processes to complete execution. Afterwards, returns the
541          corresponding child objects.  If a child doesn't exist, returns the
542          "undef" value or an empty list for "wait_one" and "wait_all"
543          respectively.
544
545          The "waitone" and "waitall" methods are aliases for compatibility
546          with "MCE::Hobo".
547
548           use MCE::Child;
549           use Time::HiRes qw(sleep);
550
551           sub task {
552               my $id = shift;
553               sleep $id * 0.333;
554               return $id;
555           }
556
557           MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
558
559           # join, traditional use case
560           $_->join() for MCE::Child->list();
561
562           # wait_one, simplistic use case
563           1 while MCE::Child->wait_one();
564
565           # wait_one
566           while ( my $child = MCE::Child->wait_one() ) {
567               my $err = $child->error() || 'no error';
568               my $res = $child->result();
569               my $pid = $child->pid();
570
571               print "[$pid] $err : $res\n";
572           }
573
574           # wait_all
575           my @procs = MCE::Child->wait_all();
576
577           for ( @procs ) {
578               my $err = $_->error() || 'no error';
579               my $res = $_->result();
580               my $pid = $_->pid();
581
582               print "[$pid] $err : $res\n";
583           }
584
585       MCE::Child->yield( [ floating_seconds ] )
586          Give other workers a chance to run, optionally for given time. Yield
587          behaves similarly to MCE's interval option. It throttles workers
588          from running too fast.  A demonstration is provided in the next
589          section for fetching URLs in parallel.
590
591          The default "floating_seconds" is 0.008 and 0.015 on UNIX and
592          Windows, respectively. Pass 0 if simply wanting to give other
593          workers a chance to run.
594
595           # total run time: 1.00 second
596
597           MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
598           MCE::Child->wait_all();
599

THREADS-like DETACH CAPABILITY

601       Threads-like detach capability was added starting with the 1.867
602       release.
603
604       A threads example is shown first followed by the MCE::Child example.
605       All one needs to do is set the CHLD signal handler to IGNORE.
606       Unfortunately, this works on UNIX platforms only. The child process
607       restores the CHLD handler to default, so is able to deeply spin workers
608       and reap if desired.
609
610        use threads;
611
612        for ( 1 .. 8 ) {
613            async {
614                # do something
615            }->detach;
616        }
617
618        use MCE::Child;
619
620        # Have the OS reap workers automatically when exiting.
621        # The on_finish option is ignored if specified (no-op).
622        # Ensure not inside a thread on UNIX platforms.
623
624        $SIG{CHLD} = 'IGNORE';
625
626        for ( 1 .. 8 ) {
627            mce_child {
628                # do something
629            };
630        }
631
632        # Optionally, wait for any remaining workers before leaving.
633        # This is necessary if workers are consuming shared objects,
634        # constructed via MCE::Shared.
635
636        MCE::Child->wait_all;
637
638       The following is another way and works on Windows.  Here, the on_finish
639       handler works as usual.
640
641        use MCE::Child;
642
643        MCE::Child->init(
644            on_finish = sub {
645                ...
646            },
647        );
648
649        for ( 1 .. 8 ) {
650            $_->join for MCE::Child->list_joinable;
651            mce_child {
652                # do something
653            };
654        }
655
656        MCE::Child->wait_all;
657

PARALLEL::FORKMANAGER-like DEMONSTRATION

659       MCE::Child behaves similarly to threads for the most part. It also
660       provides Parallel::ForkManager-like capabilities. The
661       "Parallel::ForkManager" example is shown first followed by a version
662       using "MCE::Child".
663
664       Parallel::ForkManager
665           use strict;
666           use warnings;
667
668           use Parallel::ForkManager;
669           use Time::HiRes 'time';
670
671           my $start = time;
672
673           my $pm = Parallel::ForkManager->new(10);
674           $pm->set_waitpid_blocking_sleep(0);
675
676           $pm->run_on_finish( sub {
677               my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
678               print "child $pid completed: $ident => ", $resp->[0], "\n";
679           });
680
681           DATA_LOOP:
682           foreach my $data ( 1..2000 ) {
683               # forks and returns the pid for the child
684               my $pid = $pm->start($data) and next DATA_LOOP;
685               my $ret = [ $data * 2 ];
686
687               $pm->finish(0, $ret);
688           }
689
690           $pm->wait_all_children;
691
692           printf STDERR "duration: %0.03f seconds\n", time - $start;
693
694       MCE::Child
695           use strict;
696           use warnings;
697
698           use MCE::Child 1.843;
699           use Time::HiRes 'time';
700
701           my $start = time;
702
703           MCE::Child->init(
704               max_workers => 10,
705               on_finish   => sub {
706                   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
707                   print "child $pid completed: $ident => ", $resp->[0], "\n";
708               }
709           );
710
711           foreach my $data ( 1..2000 ) {
712               MCE::Child->create( $data, sub {
713                   [ $data * 2 ];
714               });
715           }
716
717           MCE::Child->wait_all;
718
719           printf STDERR "duration: %0.03f seconds\n", time - $start;
720
721       Time to spin 2,000 workers and obtain results (in seconds).
722          Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
723          Boost).  Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
724          with Moo loaded at the top of the script.
725
726           MCE::Hobo uses MCE::Shared to retrieve data during reaping.
727           MCE::Child uses MCE::Channel, no shared-manager.
728
729                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
730
731           MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
732            MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
733                P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s
734
735           MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
736            MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
737                P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used
738
739       Set posix_exit to avoid all END and destructor processing.
740          This is helpful for reducing overhead when workers exit. Ditto if
741          using a Perl module not parallel safe. The option is ignored on
742          Windows "$^O eq 'MSWin32'".
743
744           MCE::Child->init( posix_exit => 1, ... );
745            MCE::Hobo->init( posix_exit => 1, ... );
746
747                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
748
749           MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
750            MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded
751

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

753       This demonstration constructs two queues, two handles, starts the
754       shared-manager process if needed, and spawns four workers.  For this
755       demonstration, am chunking 64 URLs per job. In reality, one may run
756       with 200 workers and chunk 300 URLs on a 24-way box.
757
758        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
759        # perl demo.pl              -- all output
760        # perl demo.pl  >/dev/null  -- mngr/child output
761        # perl demo.pl 2>/dev/null  -- show results only
762        #
763        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
764
765        use strict;
766        use warnings;
767
768        use AnyEvent;
769        use AnyEvent::HTTP;
770        use Time::HiRes qw( time );
771
772        use MCE::Child;
773        use MCE::Shared;
774
775        # Construct two queues, input and return.
776
777        my $que = MCE::Shared->queue();
778        my $ret = MCE::Shared->queue();
779
780        # Construct shared handles for serializing output from many workers
781        # writing simultaneously. This prevents garbled output.
782
783        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
784        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
785
786        # Spawn workers early for minimum memory consumption.
787
788        MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
789
790        # Obtain or generate input data for workers to process.
791
792        my ( $count, @urls ) = ( 0 );
793
794        push @urls, map { "http://127.0.0.$_/"   } 1..254;
795        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
796
797        while ( @urls ) {
798            my @chunk = splice(@urls, 0, 64);
799            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
800        }
801
802        # So that workers leave the loop after consuming the queue.
803
804        $que->end();
805
806        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
807        # Loop for the manager process. The manager may do other work if
808        # need be and periodically check $ret->pending() not shown here.
809        #
810        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
811
812        my $start = time;
813
814        printf {$ERR} "Mngr  - entering loop\n";
815
816        while ( $count ) {
817            my ( $result, $failed ) = $ret->dequeue( 2 );
818
819            # Remove ID from result, so not treated as a URL item.
820
821            printf {$ERR} "Mngr  - received job %s\n", delete $result->{ID};
822
823            # Display the URL and the size captured.
824
825            foreach my $url ( keys %{ $result } ) {
826                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
827                    if $result->{$url};  # url has content
828            }
829
830            # Display URLs could not reach.
831
832            if ( @{ $failed } ) {
833                foreach my $url ( @{ $failed } ) {
834                    print {$OUT} "Failed: $url\n";
835                }
836            }
837
838            # Decrement the count.
839
840            $count--;
841        }
842
843        MCE::Child->wait_all();
844
845        printf {$ERR} "Mngr  - exiting loop\n\n";
846        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
847
848        exit;
849
850        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
851        # Child processes enqueue two items ( $result and $failed ) per each
852        # job for the manager process. Likewise, the manager process dequeues
853        # two items above. Optionally, child processes may include the ID in
854        # the result.
855        #
856        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
857
858        sub task {
859            my ( $id ) = @_;
860            printf {$ERR} "Child $id entering loop\n";
861
862            while ( my $job = $que->dequeue() ) {
863                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
864
865                # Walk URLs, provide a hash and array refs for data.
866
867                printf {$ERR} "Child $id running  job $job->{ID}\n";
868                walk( $job, $result, $failed );
869
870                # Send results to the manager process.
871
872                $ret->enqueue( $result, $failed );
873            }
874
875            printf {$ERR} "Child $id exiting loop\n";
876        }
877
878        sub walk {
879            my ( $job, $result, $failed ) = @_;
880
881            # Yielding is critical when running an event loop in parallel.
882            # Not doing so means that the app may reach contention points
883            # with the firewall and likely impose unnecessary hardship at
884            # the OS level. The idea here is not to have multiple workers
885            # initiate HTTP requests to a batch of URLs at the same time.
886            # Yielding behaves similarly like scatter to have the child
887            # process run solo for a fraction of time.
888
889            MCE::Child->yield( 0.03 );
890
891            my $cv = AnyEvent->condvar();
892
893            # Populate the hash ref for the URLs it could reach.
894            # Do not mix AnyEvent timeout with child timeout.
895            # Therefore, choose event timeout when available.
896
897            foreach my $url ( @{ $job->{INPUT} } ) {
898                $cv->begin();
899                http_get $url, timeout => 2, sub {
900                    my ( $data, $headers ) = @_;
901                    $result->{$url} = $data;
902                    $cv->end();
903                };
904            }
905
906            $cv->recv();
907
908            # Populate the array ref for URLs it could not reach.
909
910            foreach my $url ( @{ $job->{INPUT} } ) {
911                push @{ $failed }, $url unless (exists $result->{ $url });
912            }
913
914            return;
915        }
916
917        __END__
918
919        $ perl demo.pl
920
921        Child 1 entering loop
922        Child 2 entering loop
923        Child 3 entering loop
924        Mngr  - entering loop
925        Child 2 running  job 2
926        Child 3 running  job 3
927        Child 1 running  job 1
928        Child 4 entering loop
929        Child 4 running  job 4
930        Child 2 running  job 5
931        Mngr  - received job 2
932        Child 3 running  job 6
933        Mngr  - received job 3
934        Child 1 running  job 7
935        Mngr  - received job 1
936        Child 4 running  job 8
937        Mngr  - received job 4
938        http://192.168.0.1/: 3729
939        Child 2 exiting loop
940        Mngr  - received job 5
941        Child 3 exiting loop
942        Mngr  - received job 6
943        Child 1 exiting loop
944        Mngr  - received job 7
945        Child 4 exiting loop
946        Mngr  - received job 8
947        Mngr  - exiting loop
948
949        Duration: 4.131 seconds
950

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

952       Making an executable is possible with the PAR::Packer module.  On the
953       Windows platform, threads, threads::shared, and exiting via threads are
954       necessary for the binary to exit successfully.
955
956        # https://metacpan.org/pod/PAR::Packer
957        # https://metacpan.org/pod/pp
958        #
959        #   pp -o demo.exe demo.pl
960        #   ./demo.exe
961
962        use strict;
963        use warnings;
964
965        use if $^O eq "MSWin32", "threads";
966        use if $^O eq "MSWin32", "threads::shared";
967
968        # Include minimum dependencies for MCE::Child.
969        # Add other modules required by your application here.
970
971        use Storable ();
972        use Time::HiRes ();
973
974        # use IO::FDPass ();  # optional: for condvar, handle, queue
975        # use Sereal ();      # optional: for faster serialization
976
977        use MCE::Child;
978        use MCE::Shared;
979
980        # For PAR to work on the Windows platform, one must include manually
981        # any shared modules used by the application.
982
983        # use MCE::Shared::Array;    # if using MCE::Shared->array
984        # use MCE::Shared::Cache;    # if using MCE::Shared->cache
985        # use MCE::Shared::Condvar;  # if using MCE::Shared->condvar
986        # use MCE::Shared::Handle;   # if using MCE::Shared->handle, mce_open
987        # use MCE::Shared::Hash;     # if using MCE::Shared->hash
988        # use MCE::Shared::Minidb;   # if using MCE::Shared->minidb
989        # use MCE::Shared::Ordhash;  # if using MCE::Shared->ordhash
990        # use MCE::Shared::Queue;    # if using MCE::Shared->queue
991        # use MCE::Shared::Scalar;   # if using MCE::Shared->scalar
992
993        # Et cetera. Only load modules needed for your application.
994
995        use MCE::Shared::Sequence;   # if using MCE::Shared->sequence
996
997        my $seq = MCE::Shared->sequence( 1, 9 );
998
999        sub task {
1000            my ( $id ) = @_;
1001            while ( defined ( my $num = $seq->next() ) ) {
1002                print "$id: $num\n";
1003                sleep 1;
1004            }
1005        }
1006
1007        sub main {
1008            MCE::Child->new( \&task, $_ ) for 1 .. 3;
1009            MCE::Child->wait_all();
1010        }
1011
1012        # Main must run inside a thread on the Windows platform or workers
1013        # will fail duing exiting, causing the exe to crash. The reason is
1014        # that PAR or a dependency isn't multi-process safe.
1015
1016        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1017
1018        threads->exit(0) if $INC{"threads.pm"};
1019

LIMITATION

1021       MCE::Child emits an error when "is_joinable", "is_running", and "join"
1022       isn't called by the managed process, where the child was spawned. This
1023       is a limitation in MCE::Child only due to not involving a shared-
1024       manager process for IPC.
1025
1026       This use-case is not typical.
1027

CREDITS

1029       The inspiration for "MCE::Child" comes from wanting "threads"-like
1030       behavior for processes compatible with Perl 5.8. Both can run side-by-
1031       side including safe-use by MCE workers. Likewise, the documentation
1032       resembles "threads".
1033
1034       The inspiration for "wait_all" and "wait_one" comes from the
1035       "Parallel::WorkUnit" module.
1036

SEE ALSO

1038       •  forks
1039
1040       •  forks::BerkeleyDB
1041
1042       •  MCE::Hobo
1043
1044       •  Parallel::ForkManager
1045
1046       •  Parallel::Loops
1047
1048       •  Parallel::Prefork
1049
1050       •  Parallel::WorkUnit
1051
1052       •  Proc::Fork
1053
1054       •  Thread::Tie
1055
1056       •  threads
1057

INDEX

1059       MCE, MCE::Channel, MCE::Shared
1060

AUTHOR

1062       Mario E. Roy, <marioeroy AT gmail DOT com>
1063
1064
1065
1066perl v5.36.0                      2023-01-20                     MCE::Child(3)
Impressum