1MCE::Child(3)         User Contributed Perl Documentation        MCE::Child(3)
2
3
4

NAME

6       MCE::Child - A threads-like parallelization module compatible with Perl
7       5.8
8

VERSION

10       This document describes MCE::Child version 1.874
11

SYNOPSIS

13        use MCE::Child;
14
15        MCE::Child->init(
16            max_workers => 'auto',   # default undef, unlimited
17            child_timeout => 20,     # default undef, no timeout
18            posix_exit => 1,         # default undef, CORE::exit
19            void_context => 1,       # default undef
20            on_start => sub {
21                my ( $pid, $ident ) = @_;
22                ...
23            },
24            on_finish => sub {
25                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
26                ...
27            }
28        );
29
30        MCE::Child->create( sub { print "Hello from child\n" } )->join();
31
32        sub parallel {
33            my ($arg1) = @_;
34            print "Hello again, $arg1\n" if defined($arg1);
35            print "Hello again, $_\n"; # same thing
36        }
37
38        MCE::Child->create( \&parallel, $_ ) for 1 .. 3;
39
40        my @procs    = MCE::Child->list();
41        my @pids     = MCE::Child->list_pids();
42        my @running  = MCE::Child->list_running();
43        my @joinable = MCE::Child->list_joinable();
44        my @count    = MCE::Child->pending();
45
46        # Joining is orderly, e.g. child1 is joined first, child2, child3.
47        $_->join() for @procs;   # (or)
48        $_->join() for @joinable;
49
50        # Joining occurs immediately as child processes complete execution.
51        1 while MCE::Child->wait_one();
52
53        my $child = mce_child { foreach (@files) { ... } };
54
55        $child->join();
56
57        if ( my $err = $child->error() ) {
58            warn "Child error: $err\n";
59        }
60
61        # Get a child's object
62        $child = MCE::Child->self();
63
64        # Get a child's ID
65        $pid = MCE::Child->pid();  # $$
66        $pid = $child->pid();
67        $pid = MCE::Child->tid();  # tid is an alias for pid
68        $pid = $child->tid();
69
70        # Test child objects
71        if ( $child1 == $child2 ) {
72            ...
73        }
74
75        # Give other workers a chance to run
76        MCE::Child->yield();
77        MCE::Child->yield(0.05);
78
79        # Return context, wantarray aware
80        my ($value1, $value2) = $child->join();
81        my $value = $child->join();
82
83        # Check child's state
84        if ( $child->is_running() ) {
85            sleep 1;
86        }
87        if ( $child->is_joinable() ) {
88            $child->join();
89        }
90
91        # Send a signal to a child
92        $child->kill('SIGUSR1');
93
94        # Exit a child
95        MCE::Child->exit(0);
96        MCE::Child->exit(0, @ret);
97

DESCRIPTION

99       MCE::Child is a fork of MCE::Hobo for compatibility with Perl 5.8.
100
101       A child is a migratory worker inside the machine that carries the
102       asynchronous gene. Child processes are equipped with "threads"-like
103       capability for running code asynchronously. Unlike threads, each child
104       is a unique process to the underlying OS. The IPC is handled via
105       "MCE::Channel", which runs on all the major platforms including Cygwin
106       and Strawberry Perl.
107
108       "MCE::Child" may be used as a standalone or together with "MCE"
109       including running alongside "threads".
110
111        use MCE::Child;
112        use MCE::Shared;
113
114        # synopsis: head -20 file.txt | perl script.pl
115
116        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
117        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
118        my $ary = MCE::Shared->array();
119
120        sub parallel_task {
121            my ( $id ) = @_;
122            while ( <$ifh> ) {
123                printf {$ofh} "[ %4d ] %s", $., $_;
124              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
125                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
126            }
127        }
128
129        my $child1 = MCE::Child->new( "parallel_task", 1 );
130        my $child2 = MCE::Child->new( \&parallel_task, 2 );
131        my $child3 = MCE::Child->new( sub { parallel_task(3) } );
132
133        $_->join for MCE::Child->list();  # ditto: MCE::Child->wait_all();
134
135        # search array (total one round-trip via IPC)
136        my @vals = $ary->vals( "val =~ / ID 2 /" );
137
138        print {*STDERR} join("", @vals);
139

API DOCUMENTATION

141       $child = MCE::Child->create( FUNCTION, ARGS )
142       $child = MCE::Child->new( FUNCTION, ARGS )
143          This will create a new child process that will begin execution with
144          function as the entry point, and optionally ARGS for list of
145          parameters. It will return the corresponding MCE::Child object, or
146          undef if child creation failed.
147
148          FUNCTION may either be the name of a function, an anonymous
149          subroutine, or a code ref.
150
151           my $child = MCE::Child->create( "func_name", ... );
152               # or
153           my $child = MCE::Child->create( sub { ... }, ... );
154               # or
155           my $child = MCE::Child->create( \&func, ... );
156
157       $child = MCE::Child->create( { options }, FUNCTION, ARGS )
158       $child = MCE::Child->create( IDENT, FUNCTION, ARGS )
159          Options, excluding "ident", may be specified globally via the "init"
160          function.  Otherwise, "ident", "child_timeout", "posix_exit", and
161          "void_context" may be set uniquely.
162
163          The "ident" option is used by callback functions "on_start" and
164          "on_finish" for identifying the started and finished child process
165          respectively.
166
167           my $child1 = MCE::Child->create( { posix_exit => 1 }, sub {
168               ...
169           } );
170
171           $child1->join;
172
173           my $child2 = MCE::Child->create( { child_timeout => 3 }, sub {
174               sleep 1 for ( 1 .. 9 );
175           } );
176
177           $child2->join;
178
179           if ( $child2->error() eq "Child timed out\n" ) {
180               ...
181           }
182
183          The "new()" method is an alias for "create()".
184
185       mce_child { BLOCK } ARGS;
186       mce_child { BLOCK };
187          "mce_child" runs the block asynchronously similarly to
188          "MCE::Child->create()".  It returns the child object, or undef if
189          child creation failed.
190
191           my $child = mce_child { foreach (@files) { ... } };
192
193           $child->join();
194
195           if ( my $err = $child->error() ) {
196               warn("Child error: $err\n");
197           }
198
199       $child->join()
200          This will wait for the corresponding child process to complete its
201          execution.  In non-voided context, "join()" will return the value(s)
202          of the entry point function.
203
204          The context (void, scalar or list) for the return value(s) for
205          "join" is determined at the time of joining and mostly "wantarray"
206          aware.
207
208           my $child1 = MCE::Child->create( sub {
209               my @res = qw(foo bar baz);
210               return (@res);
211           });
212
213           my @res1 = $child1->join();  # ( foo, bar, baz )
214           my $res1 = $child1->join();  #   baz
215
216           my $child2 = MCE::Child->create( sub {
217               return 'foo';
218           });
219
220           my @res2 = $child2->join();  # ( foo )
221           my $res2 = $child2->join();  #   foo
222
223       $child1->equal( $child2 )
224          Tests if two child objects are the same child or not. Child
225          comparison is based on process IDs. This is overloaded to the more
226          natural forms.
227
228           if ( $child1 == $child2 ) {
229               print("Child objects are the same\n");
230           }
231           # or
232           if ( $child1 != $child2 ) {
233               print("Child objects differ\n");
234           }
235
236       $child->error()
237          Child processes are executed in an "eval" context. This method will
238          return "undef" if the child terminates normally. Otherwise, it
239          returns the value of $@ associated with the child's execution status
240          in its "eval" context.
241
242       $child->exit()
243          This sends 'SIGQUIT' to the child process, notifying the child to
244          exit.  It returns the child object to allow for method chaining. It
245          is important to join later if not immediately to not leave a zombie
246          or defunct process.
247
248           $child->exit()->join();
249           ...
250
251           $child->join();  # later
252
253       MCE::Child->exit( 0 )
254       MCE::Child->exit( 0, @ret )
255          A child can exit at any time by calling "MCE::Child->exit()".
256          Otherwise, the behavior is the same as "exit(status)" when called
257          from the main process. The child process may optionally return data,
258          to be sent via IPC.
259
260       MCE::Child->finish()
261          This class method is called automatically by "END", but may be
262          called explicitly. An error is emitted via croak if there are active
263          child processes not yet joined.
264
265           MCE::Child->create( 'task1', $_ ) for 1 .. 4;
266           $_->join for MCE::Child->list();
267
268           MCE::Child->create( 'task2', $_ ) for 1 .. 4;
269           $_->join for MCE::Child->list();
270
271           MCE::Child->create( 'task3', $_ ) for 1 .. 4;
272           $_->join for MCE::Child->list();
273
274           MCE::Child->finish();
275
276       MCE::Child->init( options )
277          The init function accepts a list of MCE::Child options.
278
279           MCE::Child->init(
280               max_workers => 'auto',   # default undef, unlimited
281               child_timeout => 20,     # default undef, no timeout
282               posix_exit => 1,         # default undef, CORE::exit
283               void_context => 1,       # default undef
284               on_start => sub {
285                   my ( $pid, $ident ) = @_;
286                   ...
287               },
288               on_finish => sub {
289                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
290                   ...
291               }
292           );
293
294           # Identification given as an option or the 1st argument.
295
296           for my $key ( 'aa' .. 'zz' ) {
297               MCE::Child->create( { ident => $key }, sub { ... } );
298               MCE::Child->create( $key, sub { ... } );
299           }
300
301           MCE::Child->wait_all;
302
303          Set "max_workers" if you want to limit the number of workers by
304          waiting automatically for an available slot. Specify "auto" to
305          obtain the number of logical cores via "MCE::Util::get_ncpu()".
306
307          Set "child_timeout", in number of seconds, if you want the child
308          process to terminate after some time. The default is 0 for no
309          timeout.
310
311          Set "posix_exit" to avoid all END and destructor processing.
312          Constructing MCE::Child inside a thread implies 1 or if present CGI,
313          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
314          Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
315
316          Set "void_context" to create the child process in void context for
317          the return value. Otherwise, the return context is wantarray-aware
318          for "join()" and "result()" and determined when retrieving the data.
319
320          The callback options "on_start" and "on_finish" are called in the
321          parent process after starting the worker and later when terminated.
322          The arguments for the subroutines were inspired by
323          Parallel::ForkManager.
324
325          The parameters for "on_start" are the following:
326
327           - pid of the child process
328           - identification (ident option or 1st arg to create)
329
330          The parameters for "on_finish" are the following:
331
332           - pid of the child process
333           - program exit code
334           - identification (ident option or 1st arg to create)
335           - exit signal id
336           - error message from eval inside MCE::Child
337           - returned data
338
339       $child->is_running()
340          Returns true if a child is still running.
341
342       $child->is_joinable()
343          Returns true if the child has finished running and not yet joined.
344
345       $child->kill( 'SIG...' )
346          Sends the specified signal to the child. Returns the child object to
347          allow for method chaining. As with "exit", it is important to join
348          eventually if not immediately to not leave a zombie or defunct
349          process.
350
351           $child->kill('SIG...')->join();
352
353          The following is a parallel demonstration comparing "MCE::Shared"
354          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
355          after all workers have been notified to quit.
356
357           use Time::HiRes qw(time);
358
359           use Redis;
360           use Redis::Fast;
361
362           use MCE::Child;
363           use MCE::Shared;
364
365           my $redis = Redis->new();
366           my $rfast = Redis::Fast->new();
367           my $array = MCE::Shared->array();
368
369           sub parallel_redis {
370               my ($_redis) = @_;
371               my ($count, $quit, $len) = (0, 0);
372
373               # instead, use a flag to exit loop
374               $SIG{'QUIT'} = sub { $quit = 1 };
375
376               while () {
377                   $len = $_redis->rpush('list', $count++);
378                   last if $quit;
379               }
380
381               $count;
382           }
383
384           sub parallel_array {
385               my ($count, $quit, $len) = (0, 0);
386
387               # do not exit from inside handler
388               $SIG{'QUIT'} = sub { $quit = 1 };
389
390               while () {
391                   $len = $array->push($count++);
392                   last if $quit;
393               }
394
395               $count;
396           }
397
398           sub benchmark_this {
399               my ($desc, $num_procs, $timeout, $code, @args) = @_;
400               my ($start, $total) = (time(), 0);
401
402               MCE::Child->new($code, @args) for 1..$num_procs;
403               sleep $timeout;
404
405               # joining is not immediate; ok
406               $_->kill('QUIT') for MCE::Child->list();
407
408               # joining later; ok
409               $total += $_->join() for MCE::Child->list();
410
411               printf "$desc <> duration: %0.03f secs, count: $total\n",
412                   time() - $start;
413
414               sleep 0.2;
415           }
416
417           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
418           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
419           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
420
421       MCE::Child->list()
422          Returns a list of all child objects not yet joined.
423
424           @procs = MCE::Child->list();
425
426       MCE::Child->list_pids()
427          Returns a list of all child pids not yet joined (available since
428          1.849).
429
430           @pids = MCE::Child->list_pids();
431
432           $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
433               # Signal workers all at once
434               CORE::kill('KILL', MCE::Child->list_pids());
435               exec('reset');
436           };
437
438       MCE::Child->list_running()
439          Returns a list of all child objects that are still running.
440
441           @procs = MCE::Child->list_running();
442
443       MCE::Child->list_joinable()
444          Returns a list of all child objects that have completed running.
445          Thus, ready to be joined without blocking.
446
447           @procs = MCE::Child->list_joinable();
448
449       MCE::Child->max_workers([ N ])
450          Getter and setter for max_workers. Specify a number or 'auto' to
451          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
452          false value to set back to no limit.
453
454       MCE::Child->pending()
455          Returns a count of all child objects not yet joined.
456
457           $count = MCE::Child->pending();
458
459       $child->result()
460          Returns the result obtained by "join", "wait_one", or "wait_all". If
461          the process has not yet exited, waits for the corresponding child to
462          complete its execution.
463
464           use MCE::Child;
465           use Time::HiRes qw(sleep);
466
467           sub task {
468               my ($id) = @_;
469               sleep $id * 0.333;
470               return $id;
471           }
472
473           MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
474
475           # 1 while MCE::Child->wait_one();
476
477           while ( my $child = MCE::Child->wait_one() ) {
478               my $err = $child->error() || 'no error';
479               my $res = $child->result();
480               my $pid = $child->pid();
481
482               print "[$pid] $err : $res\n";
483           }
484
485          Like "join" described above, the context (void, scalar or list) for
486          the return value(s) is determined at the time "result" is called and
487          mostly "wantarray" aware.
488
489           my $child1 = MCE::Child->create( sub {
490               my @res = qw(foo bar baz);
491               return (@res);
492           });
493
494           my @res1 = $child1->result();  # ( foo, bar, baz )
495           my $res1 = $child1->result();  #   baz
496
497           my $child2 = MCE::Child->create( sub {
498               return 'foo';
499           });
500
501           my @res2 = $child2->result();  # ( foo )
502           my $res2 = $child2->result();  #   foo
503
504       MCE::Child->self()
505          Class method that allows a child to obtain it's own MCE::Child
506          object.
507
508       $child->pid()
509       $child->tid()
510          Returns the ID of the child.
511
512           pid: $$  process id
513           tid: $$  alias for pid
514
515       MCE::Child->pid()
516       MCE::Child->tid()
517          Class methods that allows a child to obtain its own ID.
518
519           pid: $$  process id
520           tid: $$  alias for pid
521
522       MCE::Child->wait_one()
523       MCE::Child->waitone()
524       MCE::Child->wait_all()
525       MCE::Child->waitall()
526          Meaningful for the manager process only, waits for one or all child
527          processes to complete execution. Afterwards, returns the
528          corresponding child objects.  If a child doesn't exist, returns the
529          "undef" value or an empty list for "wait_one" and "wait_all"
530          respectively.
531
532          The "waitone" and "waitall" methods are aliases for compatibility
533          with "MCE::Hobo".
534
535           use MCE::Child;
536           use Time::HiRes qw(sleep);
537
538           sub task {
539               my $id = shift;
540               sleep $id * 0.333;
541               return $id;
542           }
543
544           MCE::Child->create('task', $_) for ( reverse 1 .. 3 );
545
546           # join, traditional use case
547           $_->join() for MCE::Child->list();
548
549           # wait_one, simplistic use case
550           1 while MCE::Child->wait_one();
551
552           # wait_one
553           while ( my $child = MCE::Child->wait_one() ) {
554               my $err = $child->error() || 'no error';
555               my $res = $child->result();
556               my $pid = $child->pid();
557
558               print "[$pid] $err : $res\n";
559           }
560
561           # wait_all
562           my @procs = MCE::Child->wait_all();
563
564           for ( @procs ) {
565               my $err = $_->error() || 'no error';
566               my $res = $_->result();
567               my $pid = $_->pid();
568
569               print "[$pid] $err : $res\n";
570           }
571
572       MCE::Child->yield( [ floating_seconds ] )
573          Give other workers a chance to run, optionally for given time. Yield
574          behaves similarly to MCE's interval option. It throttles workers
575          from running too fast.  A demonstration is provided in the next
576          section for fetching URLs in parallel.
577
578          The default "floating_seconds" is 0.008 and 0.015 on UNIX and
579          Windows, respectively. Pass 0 if simply wanting to give other
580          workers a chance to run.
581
582           # total run time: 1.00 second
583
584           MCE::Child->create( sub { MCE::Child->yield(0.25) } ) for 1 .. 4;
585           MCE::Child->wait_all();
586

THREADS-like DETACH CAPABILITY

588       Threads-like detach capability was added starting with the 1.867
589       release.
590
591       A threads example is shown first followed by the MCE::Child example.
592       All one needs to do is set the CHLD signal handler to IGNORE.
593       Unfortunately, this works on UNIX platforms only. The child process
594       restores the CHLD handler to default, so is able to deeply spin workers
595       and reap if desired.
596
597        use threads;
598
599        for ( 1 .. 8 ) {
600            async {
601                # do something
602            }->detach;
603        }
604
605        use MCE::Child;
606
607        # Have the OS reap workers automatically when exiting.
608        # The on_finish option is ignored if specified (no-op).
609        # Ensure not inside a thread on UNIX platforms.
610
611        $SIG{CHLD} = 'IGNORE';
612
613        for ( 1 .. 8 ) {
614            mce_child {
615                # do something
616            };
617        }
618
619        # Optionally, wait for any remaining workers before leaving.
620        # This is necessary if workers are consuming shared objects,
621        # constructed via MCE::Shared.
622
623        MCE::Child->wait_all;
624
625       The following is another way and works on Windows.  Here, the on_finish
626       handler works as usual.
627
628        use MCE::Child;
629
630        MCE::Child->init(
631            on_finish = sub {
632                ...
633            },
634        );
635
636        for ( 1 .. 8 ) {
637            $_->join for MCE::Child->list_joinable;
638            mce_child {
639                # do something
640            };
641        }
642
643        MCE::Child->wait_all;
644

PARALLEL::FORKMANAGER-like DEMONSTRATION

646       MCE::Child behaves similarly to threads for the most part. It also
647       provides Parallel::ForkManager-like capabilities. The
648       "Parallel::ForkManager" example is shown first followed by a version
649       using "MCE::Child".
650
651       Parallel::ForkManager
652           use strict;
653           use warnings;
654
655           use Parallel::ForkManager;
656           use Time::HiRes 'time';
657
658           my $start = time;
659
660           my $pm = Parallel::ForkManager->new(10);
661           $pm->set_waitpid_blocking_sleep(0);
662
663           $pm->run_on_finish( sub {
664               my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
665               print "child $pid completed: $ident => ", $resp->[0], "\n";
666           });
667
668           DATA_LOOP:
669           foreach my $data ( 1..2000 ) {
670               # forks and returns the pid for the child
671               my $pid = $pm->start($data) and next DATA_LOOP;
672               my $ret = [ $data * 2 ];
673
674               $pm->finish(0, $ret);
675           }
676
677           $pm->wait_all_children;
678
679           printf STDERR "duration: %0.03f seconds\n", time - $start;
680
681       MCE::Child
682           use strict;
683           use warnings;
684
685           use MCE::Child 1.843;
686           use Time::HiRes 'time';
687
688           my $start = time;
689
690           MCE::Child->init(
691               max_workers => 10,
692               on_finish   => sub {
693                   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
694                   print "child $pid completed: $ident => ", $resp->[0], "\n";
695               }
696           );
697
698           foreach my $data ( 1..2000 ) {
699               MCE::Child->create( $data, sub {
700                   [ $data * 2 ];
701               });
702           }
703
704           MCE::Child->wait_all;
705
706           printf STDERR "duration: %0.03f seconds\n", time - $start;
707
708       Time to spin 2,000 workers and obtain results (in seconds).
709          Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
710          Boost).  Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
711          with Moo loaded at the top of the script.
712
713           MCE::Hobo uses MCE::Shared to retrieve data during reaping.
714           MCE::Child uses MCE::Channel, no shared-manager.
715
716                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
717
718           MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
719            MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
720                P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s
721
722           MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
723            MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
724                P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used
725
726       Set posix_exit to avoid all END and destructor processing.
727          This is helpful for reducing overhead when workers exit. Ditto if
728          using a Perl module not parallel safe. The option is ignored on
729          Windows "$^O eq 'MSWin32'".
730
731           MCE::Child->init( posix_exit => 1, ... );
732            MCE::Hobo->init( posix_exit => 1, ... );
733
734                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
735
736           MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
737            MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded
738

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

740       This demonstration constructs two queues, two handles, starts the
741       shared-manager process if needed, and spawns four workers.  For this
742       demonstration, am chunking 64 URLs per job. In reality, one may run
743       with 200 workers and chunk 300 URLs on a 24-way box.
744
745        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
746        # perl demo.pl              -- all output
747        # perl demo.pl  >/dev/null  -- mngr/child output
748        # perl demo.pl 2>/dev/null  -- show results only
749        #
750        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
751
752        use strict;
753        use warnings;
754
755        use AnyEvent;
756        use AnyEvent::HTTP;
757        use Time::HiRes qw( time );
758
759        use MCE::Child;
760        use MCE::Shared;
761
762        # Construct two queues, input and return.
763
764        my $que = MCE::Shared->queue();
765        my $ret = MCE::Shared->queue();
766
767        # Construct shared handles for serializing output from many workers
768        # writing simultaneously. This prevents garbled output.
769
770        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
771        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
772
773        # Spawn workers early for minimum memory consumption.
774
775        MCE::Child->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
776
777        # Obtain or generate input data for workers to process.
778
779        my ( $count, @urls ) = ( 0 );
780
781        push @urls, map { "http://127.0.0.$_/"   } 1..254;
782        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
783
784        while ( @urls ) {
785            my @chunk = splice(@urls, 0, 64);
786            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
787        }
788
789        # So that workers leave the loop after consuming the queue.
790
791        $que->end();
792
793        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
794        # Loop for the manager process. The manager may do other work if
795        # need be and periodically check $ret->pending() not shown here.
796        #
797        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
798
799        my $start = time;
800
801        printf {$ERR} "Mngr  - entering loop\n";
802
803        while ( $count ) {
804            my ( $result, $failed ) = $ret->dequeue( 2 );
805
806            # Remove ID from result, so not treated as a URL item.
807
808            printf {$ERR} "Mngr  - received job %s\n", delete $result->{ID};
809
810            # Display the URL and the size captured.
811
812            foreach my $url ( keys %{ $result } ) {
813                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
814                    if $result->{$url};  # url has content
815            }
816
817            # Display URLs could not reach.
818
819            if ( @{ $failed } ) {
820                foreach my $url ( @{ $failed } ) {
821                    print {$OUT} "Failed: $url\n";
822                }
823            }
824
825            # Decrement the count.
826
827            $count--;
828        }
829
830        MCE::Child->wait_all();
831
832        printf {$ERR} "Mngr  - exiting loop\n\n";
833        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
834
835        exit;
836
837        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
838        # Child processes enqueue two items ( $result and $failed ) per each
839        # job for the manager process. Likewise, the manager process dequeues
840        # two items above. Optionally, child processes may include the ID in
841        # the result.
842        #
843        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
844
845        sub task {
846            my ( $id ) = @_;
847            printf {$ERR} "Child $id entering loop\n";
848
849            while ( my $job = $que->dequeue() ) {
850                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
851
852                # Walk URLs, provide a hash and array refs for data.
853
854                printf {$ERR} "Child $id running  job $job->{ID}\n";
855                walk( $job, $result, $failed );
856
857                # Send results to the manager process.
858
859                $ret->enqueue( $result, $failed );
860            }
861
862            printf {$ERR} "Child $id exiting loop\n";
863        }
864
865        sub walk {
866            my ( $job, $result, $failed ) = @_;
867
868            # Yielding is critical when running an event loop in parallel.
869            # Not doing so means that the app may reach contention points
870            # with the firewall and likely impose unnecessary hardship at
871            # the OS level. The idea here is not to have multiple workers
872            # initiate HTTP requests to a batch of URLs at the same time.
873            # Yielding behaves similarly like scatter to have the child
874            # process run solo for a fraction of time.
875
876            MCE::Child->yield( 0.03 );
877
878            my $cv = AnyEvent->condvar();
879
880            # Populate the hash ref for the URLs it could reach.
881            # Do not mix AnyEvent timeout with child timeout.
882            # Therefore, choose event timeout when available.
883
884            foreach my $url ( @{ $job->{INPUT} } ) {
885                $cv->begin();
886                http_get $url, timeout => 2, sub {
887                    my ( $data, $headers ) = @_;
888                    $result->{$url} = $data;
889                    $cv->end();
890                };
891            }
892
893            $cv->recv();
894
895            # Populate the array ref for URLs it could not reach.
896
897            foreach my $url ( @{ $job->{INPUT} } ) {
898                push @{ $failed }, $url unless (exists $result->{ $url });
899            }
900
901            return;
902        }
903
904        __END__
905
906        $ perl demo.pl
907
908        Child 1 entering loop
909        Child 2 entering loop
910        Child 3 entering loop
911        Mngr  - entering loop
912        Child 2 running  job 2
913        Child 3 running  job 3
914        Child 1 running  job 1
915        Child 4 entering loop
916        Child 4 running  job 4
917        Child 2 running  job 5
918        Mngr  - received job 2
919        Child 3 running  job 6
920        Mngr  - received job 3
921        Child 1 running  job 7
922        Mngr  - received job 1
923        Child 4 running  job 8
924        Mngr  - received job 4
925        http://192.168.0.1/: 3729
926        Child 2 exiting loop
927        Mngr  - received job 5
928        Child 3 exiting loop
929        Mngr  - received job 6
930        Child 1 exiting loop
931        Mngr  - received job 7
932        Child 4 exiting loop
933        Mngr  - received job 8
934        Mngr  - exiting loop
935
936        Duration: 4.131 seconds
937

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

939       Making an executable is possible with the PAR::Packer module.  On the
940       Windows platform, threads, threads::shared, and exiting via threads are
941       necessary for the binary to exit successfully.
942
943        # https://metacpan.org/pod/PAR::Packer
944        # https://metacpan.org/pod/pp
945        #
946        #   pp -o demo.exe demo.pl
947        #   ./demo.exe
948
949        use strict;
950        use warnings;
951
952        use if $^O eq "MSWin32", "threads";
953        use if $^O eq "MSWin32", "threads::shared";
954
955        # Include minimum dependencies for MCE::Child.
956        # Add other modules required by your application here.
957
958        use Storable ();
959        use Time::HiRes ();
960
961        # use IO::FDPass ();  # optional: for condvar, handle, queue
962        # use Sereal ();      # optional: for faster serialization
963
964        use MCE::Child;
965        use MCE::Shared;
966
967        # For PAR to work on the Windows platform, one must include manually
968        # any shared modules used by the application.
969
970        # use MCE::Shared::Array;    # if using MCE::Shared->array
971        # use MCE::Shared::Cache;    # if using MCE::Shared->cache
972        # use MCE::Shared::Condvar;  # if using MCE::Shared->condvar
973        # use MCE::Shared::Handle;   # if using MCE::Shared->handle, mce_open
974        # use MCE::Shared::Hash;     # if using MCE::Shared->hash
975        # use MCE::Shared::Minidb;   # if using MCE::Shared->minidb
976        # use MCE::Shared::Ordhash;  # if using MCE::Shared->ordhash
977        # use MCE::Shared::Queue;    # if using MCE::Shared->queue
978        # use MCE::Shared::Scalar;   # if using MCE::Shared->scalar
979
980        # Et cetera. Only load modules needed for your application.
981
982        use MCE::Shared::Sequence;   # if using MCE::Shared->sequence
983
984        my $seq = MCE::Shared->sequence( 1, 9 );
985
986        sub task {
987            my ( $id ) = @_;
988            while ( defined ( my $num = $seq->next() ) ) {
989                print "$id: $num\n";
990                sleep 1;
991            }
992        }
993
994        sub main {
995            MCE::Child->new( \&task, $_ ) for 1 .. 3;
996            MCE::Child->wait_all();
997        }
998
999        # Main must run inside a thread on the Windows platform or workers
1000        # will fail duing exiting, causing the exe to crash. The reason is
1001        # that PAR or a dependency isn't multi-process safe.
1002
1003        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1004
1005        threads->exit(0) if $INC{"threads.pm"};
1006

LIMITATION

1008       MCE::Child emits an error when "is_joinable", "is_running", and "join"
1009       isn't called by the managed process, where the child was spawned. This
1010       is a limitation in MCE::Child only due to not involving a shared-
1011       manager process for IPC.
1012
1013       This use-case is not typical.
1014

CREDITS

1016       The inspiration for "MCE::Child" comes from wanting "threads"-like
1017       behavior for processes compatible with Perl 5.8. Both can run side-by-
1018       side including safe-use by MCE workers. Likewise, the documentation
1019       resembles "threads".
1020
1021       The inspiration for "wait_all" and "wait_one" comes from the
1022       "Parallel::WorkUnit" module.
1023

SEE ALSO

1025       ·  forks
1026
1027       ·  forks::BerkeleyDB
1028
1029       ·  MCE::Hobo
1030
1031       ·  Parallel::ForkManager
1032
1033       ·  Parallel::Loops
1034
1035       ·  Parallel::Prefork
1036
1037       ·  Parallel::WorkUnit
1038
1039       ·  Proc::Fork
1040
1041       ·  Thread::Tie
1042
1043       ·  threads
1044

INDEX

1046       MCE, MCE::Channel, MCE::Shared
1047

AUTHOR

1049       Mario E. Roy, <marioeroy AT gmail DOT com>
1050
1051
1052
1053perl v5.32.0                      2020-08-19                     MCE::Child(3)
Impressum