1MCE::Hobo(3)          User Contributed Perl Documentation         MCE::Hobo(3)
2
3
4

NAME

6       MCE::Hobo - A threads-like parallelization module
7

VERSION

9       This document describes MCE::Hobo version 1.873
10

SYNOPSIS

12        use MCE::Hobo;
13
14        MCE::Hobo->init(
15            max_workers => 'auto',   # default undef, unlimited
16            hobo_timeout => 20,      # default undef, no timeout
17            posix_exit => 1,         # default undef, CORE::exit
18            void_context => 1,       # default undef
19            on_start => sub {
20                my ( $pid, $ident ) = @_;
21                ...
22            },
23            on_finish => sub {
24                my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
25                ...
26            }
27        );
28
29        MCE::Hobo->create( sub { print "Hello from hobo\n" } )->join();
30
31        sub parallel {
32            my ($arg1) = @_;
33            print "Hello again, $arg1\n" if defined($arg1);
34            print "Hello again, $_\n"; # same thing
35        }
36
37        MCE::Hobo->create( \&parallel, $_ ) for 1 .. 3;
38
39        my @hobos    = MCE::Hobo->list();
40        my @pids     = MCE::Hobo->list_pids();
41        my @running  = MCE::Hobo->list_running();
42        my @joinable = MCE::Hobo->list_joinable();
43        my @count    = MCE::Hobo->pending();
44
45        # Joining is orderly, e.g. hobo1 is joined first, hobo2, hobo3.
46        $_->join() for @hobos;   # (or)
47        $_->join() for @joinable;
48
49        # Joining occurs immediately as hobo processes complete execution.
50        1 while MCE::Hobo->wait_one();
51
52        my $hobo = mce_async { foreach (@files) { ... } };
53
54        $hobo->join();
55
56        if ( my $err = $hobo->error() ) {
57            warn "Hobo error: $err\n";
58        }
59
60        # Get a hobo's object
61        $hobo = MCE::Hobo->self();
62
63        # Get a hobo's ID
64        $pid = MCE::Hobo->pid();  # $$
65        $pid = $hobo->pid();
66        $pid = MCE::Hobo->tid();  # tid is an alias for pid
67        $pid = $hobo->tid();
68
69        # Test hobo objects
70        if ( $hobo1 == $hobo2 ) {
71            ...
72        }
73
74        # Give other workers a chance to run
75        MCE::Hobo->yield();
76        MCE::Hobo->yield(0.05);
77
78        # Return context, wantarray aware
79        my ($value1, $value2) = $hobo->join();
80        my $value = $hobo->join();
81
82        # Check hobo's state
83        if ( $hobo->is_running() ) {
84            sleep 1;
85        }
86        if ( $hobo->is_joinable() ) {
87            $hobo->join();
88        }
89
90        # Send a signal to a hobo
91        $hobo->kill('SIGUSR1');
92
93        # Exit a hobo
94        MCE::Hobo->exit(0);
95        MCE::Hobo->exit(0, @ret);  # MCE::Hobo 1.827+
96

DESCRIPTION

98       A hobo is a migratory worker inside the machine that carries the
99       asynchronous gene. Hobo processes are equipped with "threads"-like
100       capability for running code asynchronously. Unlike threads, each hobo
101       is a unique process to the underlying OS. The IPC is managed by
102       "MCE::Shared", which runs on all the major platforms including Cygwin
103       and Strawberry Perl.
104
105       An exception was made on the Windows platform to spawn threads versus
106       children in "MCE::Hobo" 1.807 through 1.816. For consistency, the 1.817
107       release reverts back to spawning children on all supported platforms.
108
109       "MCE::Hobo" may be used as a standalone or together with "MCE"
110       including running alongside "threads".
111
112        use MCE::Hobo;
113        use MCE::Shared;
114
115        # synopsis: head -20 file.txt | perl script.pl
116
117        my $ifh = MCE::Shared->handle( "<", \*STDIN  );  # shared
118        my $ofh = MCE::Shared->handle( ">", \*STDOUT );
119        my $ary = MCE::Shared->array();
120
121        sub parallel_task {
122            my ( $id ) = @_;
123            while ( <$ifh> ) {
124                printf {$ofh} "[ %4d ] %s", $., $_;
125              # $ary->[ $. - 1 ] = "[ ID $id ] read line $.\n" );  # dereferencing
126                $ary->set( $. - 1, "[ ID $id ] read line $.\n" );  # faster via OO
127            }
128        }
129
130        my $hobo1 = MCE::Hobo->new( "parallel_task", 1 );
131        my $hobo2 = MCE::Hobo->new( \&parallel_task, 2 );
132        my $hobo3 = MCE::Hobo->new( sub { parallel_task(3) } );
133
134        $_->join for MCE::Hobo->list();  # ditto: MCE::Hobo->wait_all();
135
136        # search array (total one round-trip via IPC)
137        my @vals = $ary->vals( "val =~ / ID 2 /" );
138
139        print {*STDERR} join("", @vals);
140

API DOCUMENTATION

142       $hobo = MCE::Hobo->create( FUNCTION, ARGS )
143       $hobo = MCE::Hobo->new( FUNCTION, ARGS )
144          This will create a new hobo process that will begin execution with
145          function as the entry point, and optionally ARGS for list of
146          parameters. It will return the corresponding MCE::Hobo object, or
147          undef if hobo creation failed.
148
149          FUNCTION may either be the name of a function, an anonymous
150          subroutine, or a code ref.
151
152           my $hobo = MCE::Hobo->create( "func_name", ... );
153               # or
154           my $hobo = MCE::Hobo->create( sub { ... }, ... );
155               # or
156           my $hobo = MCE::Hobo->create( \&func, ... );
157
158       $hobo = MCE::Hobo->create( { options }, FUNCTION, ARGS )
159       $hobo = MCE::Hobo->create( IDENT, FUNCTION, ARGS )
160          Options, excluding "ident", may be specified globally via the "init"
161          function.  Otherwise, "ident", "hobo_timeout", "posix_exit", and
162          "void_context" may be set uniquely.
163
164          The "ident" option, available since 1.827, is used by callback
165          functions "on_start" and "on_finish" for identifying the started and
166          finished hobo process respectively.
167
168           my $hobo1 = MCE::Hobo->create( { posix_exit => 1 }, sub {
169               ...
170           } );
171
172           $hobo1->join;
173
174           my $hobo2 = MCE::Hobo->create( { hobo_timeout => 3 }, sub {
175               sleep 1 for ( 1 .. 9 );
176           } );
177
178           $hobo2->join;
179
180           if ( $hobo2->error() eq "Hobo timed out\n" ) {
181               ...
182           }
183
184          The "new()" method is an alias for "create()".
185
186       mce_async { BLOCK } ARGS;
187       mce_async { BLOCK };
188          "mce_async" runs the block asynchronously similarly to
189          "MCE::Hobo->create()".  It returns the hobo object, or undef if hobo
190          creation failed.
191
192           my $hobo = mce_async { foreach (@files) { ... } };
193
194           $hobo->join();
195
196           if ( my $err = $hobo->error() ) {
197               warn("Hobo error: $err\n");
198           }
199
200       $hobo->join()
201          This will wait for the corresponding hobo process to complete its
202          execution.  In non-voided context, "join()" will return the value(s)
203          of the entry point function.
204
205          The context (void, scalar or list) for the return value(s) for
206          "join" is determined at the time of joining and mostly "wantarray"
207          aware.
208
209           my $hobo1 = MCE::Hobo->create( sub {
210               my @res = qw(foo bar baz);
211               return (@res);
212           });
213
214           my @res1 = $hobo1->join();  # ( foo, bar, baz )
215           my $res1 = $hobo1->join();  #   baz
216
217           my $hobo2 = MCE::Hobo->create( sub {
218               return 'foo';
219           });
220
221           my @res2 = $hobo2->join();  # ( foo )
222           my $res2 = $hobo2->join();  #   foo
223
224       $hobo1->equal( $hobo2 )
225          Tests if two hobo objects are the same hobo or not. Hobo comparison
226          is based on process IDs. This is overloaded to the more natural
227          forms.
228
229           if ( $hobo1 == $hobo2 ) {
230               print("Hobo objects are the same\n");
231           }
232           # or
233           if ( $hobo1 != $hobo2 ) {
234               print("Hobo objects differ\n");
235           }
236
237       $hobo->error()
238          Hobo processes are executed in an "eval" context. This method will
239          return "undef" if the hobo terminates normally. Otherwise, it
240          returns the value of $@ associated with the hobo's execution status
241          in its "eval" context.
242
243       $hobo->exit()
244          This sends 'SIGQUIT' to the hobo process, notifying the hobo to
245          exit.  It returns the hobo object to allow for method chaining. It
246          is important to join later if not immediately to not leave a zombie
247          or defunct process.
248
249           $hobo->exit()->join();
250           ...
251
252           $hobo->join();  # later
253
254       MCE::Hobo->exit( 0 )
255       MCE::Hobo->exit( 0, @ret )
256          A hobo can exit at any time by calling "MCE::Hobo->exit()".
257          Otherwise, the behavior is the same as "exit(status)" when called
258          from the main process. Current since 1.827, the hobo process may
259          optionally return data, to be sent via IPC.
260
261       MCE::Hobo->finish()
262          This class method is called automatically by "END", but may be
263          called explicitly. An error is emitted via croak if there are active
264          hobo processes not yet joined.
265
266           MCE::Hobo->create( 'task1', $_ ) for 1 .. 4;
267           $_->join for MCE::Hobo->list();
268
269           MCE::Hobo->create( 'task2', $_ ) for 1 .. 4;
270           $_->join for MCE::Hobo->list();
271
272           MCE::Hobo->create( 'task3', $_ ) for 1 .. 4;
273           $_->join for MCE::Hobo->list();
274
275           MCE::Hobo->finish();
276
277       MCE::Hobo->init( options )
278          The init function accepts a list of MCE::Hobo options.
279
280           MCE::Hobo->init(
281               max_workers => 'auto',   # default undef, unlimited
282               hobo_timeout => 20,      # default undef, no timeout
283               posix_exit => 1,         # default undef, CORE::exit
284               void_context => 1,       # default undef
285               on_start => sub {
286                   my ( $pid, $ident ) = @_;
287                   ...
288               },
289               on_finish => sub {
290                   my ( $pid, $exit, $ident, $signal, $error, @ret ) = @_;
291                   ...
292               }
293           );
294
295           # Identification given as an option or the 1st argument.
296           # Current API available since 1.827.
297
298           for my $key ( 'aa' .. 'zz' ) {
299               MCE::Hobo->create( { ident => $key }, sub { ... } );
300               MCE::Hobo->create( $key, sub { ... } );
301           }
302
303           MCE::Hobo->wait_all;
304
305          Set "max_workers" if you want to limit the number of workers by
306          waiting automatically for an available slot. Specify "auto" to
307          obtain the number of logical cores via "MCE::Util::get_ncpu()".
308
309          Set "hobo_timeout", in number of seconds, if you want the hobo
310          process to terminate after some time. The default is 0 for no
311          timeout.
312
313          Set "posix_exit" to avoid all END and destructor processing.
314          Constructing MCE::Hobo inside a thread implies 1 or if present CGI,
315          FCGI, Coro, Curses, Gearman::Util, Gearman::XS, LWP::UserAgent,
316          Mojo::IOLoop, STFL, Tk, Wx, or Win32::GUI.
317
318          Set "void_context" to create the hobo process in void context for
319          the return value. Otherwise, the return context is wantarray-aware
320          for "join()" and "result()" and determined when retrieving the data.
321
322          The callback options "on_start" and "on_finish" are called in the
323          parent process after starting the worker and later when terminated.
324          The arguments for the subroutines were inspired by
325          Parallel::ForkManager.
326
327          The parameters for "on_start" are the following:
328
329           - pid of the hobo process
330           - identification (ident option or 1st arg to create)
331
332          The parameters for "on_finish" are the following:
333
334           - pid of the hobo process
335           - program exit code
336           - identification (ident option or 1st arg to create)
337           - exit signal id
338           - error message from eval inside MCE::Hobo
339           - returned data
340
341       $hobo->is_running()
342          Returns true if a hobo is still running.
343
344       $hobo->is_joinable()
345          Returns true if the hobo has finished running and not yet joined.
346
347       $hobo->kill( 'SIG...' )
348          Sends the specified signal to the hobo. Returns the hobo object to
349          allow for method chaining. As with "exit", it is important to join
350          eventually if not immediately to not leave a zombie or defunct
351          process.
352
353           $hobo->kill('SIG...')->join();
354
355          The following is a parallel demonstration comparing "MCE::Shared"
356          against "Redis" and "Redis::Fast" on a Fedora 23 VM. Joining begins
357          after all workers have been notified to quit.
358
359           use Time::HiRes qw(time);
360
361           use Redis;
362           use Redis::Fast;
363
364           use MCE::Hobo;
365           use MCE::Shared;
366
367           my $redis = Redis->new();
368           my $rfast = Redis::Fast->new();
369           my $array = MCE::Shared->array();
370
371           sub parallel_redis {
372               my ($_redis) = @_;
373               my ($count, $quit, $len) = (0, 0);
374
375               # instead, use a flag to exit loop
376               $SIG{'QUIT'} = sub { $quit = 1 };
377
378               while () {
379                   $len = $_redis->rpush('list', $count++);
380                   last if $quit;
381               }
382
383               $count;
384           }
385
386           sub parallel_array {
387               my ($count, $quit, $len) = (0, 0);
388
389               # do not exit from inside handler
390               $SIG{'QUIT'} = sub { $quit = 1 };
391
392               while () {
393                   $len = $array->push($count++);
394                   last if $quit;
395               }
396
397               $count;
398           }
399
400           sub benchmark_this {
401               my ($desc, $num_procs, $timeout, $code, @args) = @_;
402               my ($start, $total) = (time(), 0);
403
404               MCE::Hobo->new($code, @args) for 1..$num_procs;
405               sleep $timeout;
406
407               # joining is not immediate; ok
408               $_->kill('QUIT') for MCE::Hobo->list();
409
410               # joining later; ok
411               $total += $_->join() for MCE::Hobo->list();
412
413               printf "$desc <> duration: %0.03f secs, count: $total\n",
414                   time() - $start;
415
416               sleep 0.2;
417           }
418
419           benchmark_this('Redis      ', 8, 5.0, \&parallel_redis, $redis);
420           benchmark_this('Redis::Fast', 8, 5.0, \&parallel_redis, $rfast);
421           benchmark_this('MCE::Shared', 8, 5.0, \&parallel_array);
422
423       MCE::Hobo->list()
424          Returns a list of all hobo objects not yet joined.
425
426           @hobos = MCE::Hobo->list();
427
428       MCE::Hobo->list_pids()
429          Returns a list of all hobo pids not yet joined (available since
430          1.849).
431
432           @pids = MCE::Hobo->list_pids();
433
434           $SIG{INT} = $SIG{HUP} = $SIG{TERM} = sub {
435               # Signal workers and the shared manager all at once
436               CORE::kill('KILL', MCE::Hobo->list_pids(), MCE::Shared->pid());
437               exec('reset');
438           };
439
440       MCE::Hobo->list_running()
441          Returns a list of all hobo objects that are still running.
442
443           @hobos = MCE::Hobo->list_running();
444
445       MCE::Hobo->list_joinable()
446          Returns a list of all hobo objects that have completed running.
447          Thus, ready to be joined without blocking.
448
449           @hobos = MCE::Hobo->list_joinable();
450
451       MCE::Hobo->max_workers([ N ])
452          Getter and setter for max_workers. Specify a number or 'auto' to
453          acquire the total number of cores via MCE::Util::get_ncpu. Specify a
454          false value to set back to no limit.
455
456          API available since 1.835.
457
458       MCE::Hobo->pending()
459          Returns a count of all hobo objects not yet joined.
460
461           $count = MCE::Hobo->pending();
462
463       $hobo->result()
464          Returns the result obtained by "join", "wait_one", or "wait_all". If
465          the process has not yet exited, waits for the corresponding hobo to
466          complete its execution.
467
468           use MCE::Hobo;
469           use Time::HiRes qw(sleep);
470
471           sub task {
472               my ($id) = @_;
473               sleep $id * 0.333;
474               return $id;
475           }
476
477           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
478
479           # 1 while MCE::Hobo->wait_one();
480
481           while ( my $hobo = MCE::Hobo->wait_one() ) {
482               my $err = $hobo->error() || 'no error';
483               my $res = $hobo->result();
484               my $pid = $hobo->pid();
485
486               print "[$pid] $err : $res\n";
487           }
488
489          Like "join" described above, the context (void, scalar or list) for
490          the return value(s) is determined at the time "result" is called and
491          mostly "wantarray" aware.
492
493           my $hobo1 = MCE::Hobo->create( sub {
494               my @res = qw(foo bar baz);
495               return (@res);
496           });
497
498           my @res1 = $hobo1->result();  # ( foo, bar, baz )
499           my $res1 = $hobo1->result();  #   baz
500
501           my $hobo2 = MCE::Hobo->create( sub {
502               return 'foo';
503           });
504
505           my @res2 = $hobo2->result();  # ( foo )
506           my $res2 = $hobo2->result();  #   foo
507
508       MCE::Hobo->self()
509          Class method that allows a hobo to obtain it's own MCE::Hobo object.
510
511       $hobo->pid()
512       $hobo->tid()
513          Returns the ID of the hobo.
514
515           pid: $$  process id
516           tid: $$  alias for pid
517
518       MCE::Hobo->pid()
519       MCE::Hobo->tid()
520          Class methods that allows a hobo to obtain its own ID.
521
522           pid: $$  process id
523           tid: $$  alias for pid
524
525       MCE::Hobo->wait_one()
526       MCE::Hobo->waitone()
527       MCE::Hobo->wait_all()
528       MCE::Hobo->waitall()
529          Meaningful for the manager process only, waits for one or all hobo
530          processes to complete execution. Afterwards, returns the
531          corresponding hobo objects.  If a hobo doesn't exist, returns the
532          "undef" value or an empty list for "wait_one" and "wait_all"
533          respectively.
534
535          The "waitone" and "waitall" methods are aliases since 1.827 for
536          backwards compatibility.
537
538           use MCE::Hobo;
539           use Time::HiRes qw(sleep);
540
541           sub task {
542               my $id = shift;
543               sleep $id * 0.333;
544               return $id;
545           }
546
547           MCE::Hobo->create('task', $_) for ( reverse 1 .. 3 );
548
549           # join, traditional use case
550           $_->join() for MCE::Hobo->list();
551
552           # wait_one, simplistic use case
553           1 while MCE::Hobo->wait_one();
554
555           # wait_one
556           while ( my $hobo = MCE::Hobo->wait_one() ) {
557               my $err = $hobo->error() || 'no error';
558               my $res = $hobo->result();
559               my $pid = $hobo->pid();
560
561               print "[$pid] $err : $res\n";
562           }
563
564           # wait_all
565           my @hobos = MCE::Hobo->wait_all();
566
567           for ( @hobos ) {
568               my $err = $_->error() || 'no error';
569               my $res = $_->result();
570               my $pid = $_->pid();
571
572               print "[$pid] $err : $res\n";
573           }
574
575       MCE::Hobo->yield( [ floating_seconds ] )
576          Prior API till 1.826.
577
578          Let this hobo yield CPU time to other workers. By default, the class
579          method calls "sleep(0.008)" on UNIX and "sleep(0.015)" on Windows
580          including Cygwin.
581
582           MCE::Hobo->yield();
583           MCE::Hobo->yield(0.05);
584
585           # total run time: 0.25 seconds, sleep occuring in parallel
586
587           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
588           MCE::Hobo->wait_all();
589
590          Current API available since 1.827.
591
592          Give other workers a chance to run, optionally for given time. Yield
593          behaves similarly to MCE's interval option. It throttles workers
594          from running too fast.  A demonstration is provided in the next
595          section for fetching URLs in parallel.
596
597          The default "floating_seconds" is 0.008 and 0.015 on UNIX and
598          Windows, respectively. Pass 0 if simply wanting to give other
599          workers a chance to run.
600
601           # total run time: 1.00 second
602
603           MCE::Hobo->create( sub { MCE::Hobo->yield(0.25) } ) for 1 .. 4;
604           MCE::Hobo->wait_all();
605

THREADS-like DETACH CAPABILITY

607       Threads-like detach capability was added starting with the 1.867
608       release.
609
610       A threads example is shown first followed by the MCE::Hobo example. All
611       one needs to do is set the CHLD signal handler to IGNORE.
612       Unfortunately, this works on UNIX platforms only. The hobo process
613       restores the CHLD handler to default, so is able to deeply spin workers
614       and reap if desired.
615
616        use threads;
617
618        for ( 1 .. 8 ) {
619            async {
620                # do something
621            }->detach;
622        }
623
624        use MCE::Hobo;
625
626        # Have the OS reap workers automatically when exiting.
627        # The on_finish option is ignored if specified (no-op).
628        # Ensure not inside a thread on UNIX platforms.
629
630        $SIG{CHLD} = 'IGNORE';
631
632        for ( 1 .. 8 ) {
633            mce_async {
634                # do something
635            };
636        }
637
638        # Optionally, wait for any remaining workers before leaving.
639        # This is necessary if workers are consuming shared objects,
640        # constructed via MCE::Shared.
641
642        MCE::Hobo->wait_all;
643
644       The following is another way and works on Windows.  Here, the on_finish
645       handler works as usual.
646
647        use MCE::Hobo;
648
649        MCE::Hobo->init(
650            on_finish = sub {
651                ...
652            },
653        );
654
655        for ( 1 .. 8 ) {
656            $_->join for MCE::Hobo->list_joinable;
657            mce_async {
658                # do something
659            };
660        }
661
662        MCE::Hobo->wait_all;
663

PARALLEL::FORKMANAGER-like DEMONSTRATION

665       MCE::Hobo behaves similarly to threads for the most part. It also
666       provides Parallel::ForkManager-like capabilities. The
667       "Parallel::ForkManager" example is shown first followed by a version
668       using "MCE::Hobo".
669
670       Parallel::ForkManager
671           use strict;
672           use warnings;
673
674           use Parallel::ForkManager;
675           use Time::HiRes 'time';
676
677           my $start = time;
678
679           my $pm = Parallel::ForkManager->new(10);
680           $pm->set_waitpid_blocking_sleep(0);
681
682           $pm->run_on_finish( sub {
683               my ($pid, $exit_code, $ident, $exit_signal, $core_dumped, $resp) = @_;
684               print "child $pid completed: $ident => ", $resp->[0], "\n";
685           });
686
687           DATA_LOOP:
688           foreach my $data ( 1..2000 ) {
689               # forks and returns the pid for the child
690               my $pid = $pm->start($data) and next DATA_LOOP;
691               my $ret = [ $data * 2 ];
692
693               $pm->finish(0, $ret);
694           }
695
696           $pm->wait_all_children;
697
698           printf STDERR "duration: %0.03f seconds\n", time - $start;
699
700       MCE::Hobo
701           use strict;
702           use warnings;
703
704           use MCE::Hobo 1.843;
705           use Time::HiRes 'time';
706
707           my $start = time;
708
709           MCE::Hobo->init(
710               max_workers => 10,
711               on_finish   => sub {
712                   my ($pid, $exit_code, $ident, $exit_signal, $error, $resp) = @_;
713                   print "child $pid completed: $ident => ", $resp->[0], "\n";
714               }
715           );
716
717           foreach my $data ( 1..2000 ) {
718               MCE::Hobo->create( $data, sub {
719                   [ $data * 2 ];
720               });
721           }
722
723           MCE::Hobo->wait_all;
724
725           printf STDERR "duration: %0.03f seconds\n", time - $start;
726
727       Time to spin 2,000 workers and obtain results (in seconds).
728          Results were obtained on a Macbook Pro (2.6 GHz ~ 3.6 GHz with Turbo
729          Boost).  Parallel::ForkManager 2.02 uses Moo. Therefore, I ran again
730          with Moo loaded at the top of the script.
731
732           MCE::Hobo uses MCE::Shared to retrieve data during reaping.
733           MCE::Child uses MCE::Channel, no shared-manager.
734
735                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
736
737           MCE::Child 1.843  19.099s  17.091s  0.965s  1.534s  1.229s
738            MCE::Hobo 1.843  20.514s  19.594s  1.246s  1.629s  1.613s
739                P::FM 1.20   19.703s  19.235s  0.875s  1.445s  1.346s
740
741           MCE::Child 1.843  20.426s  18.417s  1.116s  1.632s  1.338s  Moo loaded
742            MCE::Hobo 1.843  21.809s  20.810s  1.407s  1.759s  1.722s  Moo loaded
743                P::FM 2.02   21.668s  25.927s  1.882s  2.612s  2.483s  Moo used
744
745       Set posix_exit to avoid all END and destructor processing.
746          This is helpful for reducing overhead when workers exit. Ditto if
747          using a Perl module not parallel safe. The option is ignored on
748          Windows "$^O eq 'MSWin32'".
749
750           MCE::Child->init( posix_exit => 1, ... );
751            MCE::Hobo->init( posix_exit => 1, ... );
752
753                    Version  Cygwin   Windows  Linux   macOS  FreeBSD
754
755           MCE::Child 1.843  19.815s  ignored  0.824s  1.284s  1.245s  Moo loaded
756            MCE::Hobo 1.843  21.029s  ignored  0.953s  1.335s  1.439s  Moo loaded
757

PARALLEL HTTP GET DEMONSTRATION USING ANYEVENT

759       This demonstration constructs two queues, two handles, starts the
760       shared-manager process if needed, and spawns four workers.  For this
761       demonstration, am chunking 64 URLs per job. In reality, one may run
762       with 200 workers and chunk 300 URLs on a 24-way box.
763
764        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
765        # perl demo.pl              -- all output
766        # perl demo.pl  >/dev/null  -- mngr/hobo output
767        # perl demo.pl 2>/dev/null  -- show results only
768        #
769        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
770
771        use strict;
772        use warnings;
773
774        use AnyEvent;
775        use AnyEvent::HTTP;
776        use Time::HiRes qw( time );
777
778        use MCE::Hobo;
779        use MCE::Shared;
780
781        # Construct two queues, input and return.
782
783        my $que = MCE::Shared->queue();
784        my $ret = MCE::Shared->queue();
785
786        # Construct shared handles for serializing output from many workers
787        # writing simultaneously. This prevents garbled output.
788
789        mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
790        mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
791
792        # Spawn workers early for minimum memory consumption.
793
794        MCE::Hobo->create({ posix_exit => 1 }, 'task', $_) for 1 .. 4;
795
796        # Obtain or generate input data for workers to process.
797
798        my ( $count, @urls ) = ( 0 );
799
800        push @urls, map { "http://127.0.0.$_/"   } 1..254;
801        push @urls, map { "http://192.168.0.$_/" } 1..254; # 508 URLs total
802
803        while ( @urls ) {
804            my @chunk = splice(@urls, 0, 64);
805            $que->enqueue( { ID => ++$count, INPUT => \@chunk } );
806        }
807
808        # So that workers leave the loop after consuming the queue.
809
810        $que->end();
811
812        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
813        # Loop for the manager process. The manager may do other work if
814        # need be and periodically check $ret->pending() not shown here.
815        #
816        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
817
818        my $start = time;
819
820        printf {$ERR} "Mngr - entering loop\n";
821
822        while ( $count ) {
823            my ( $result, $failed ) = $ret->dequeue( 2 );
824
825            # Remove ID from result, so not treated as a URL item.
826
827            printf {$ERR} "Mngr - received job %s\n", delete $result->{ID};
828
829            # Display the URL and the size captured.
830
831            foreach my $url ( keys %{ $result } ) {
832                printf {$OUT} "%s: %d\n", $url, length($result->{$url})
833                    if $result->{$url};  # url has content
834            }
835
836            # Display URLs could not reach.
837
838            if ( @{ $failed } ) {
839                foreach my $url ( @{ $failed } ) {
840                    print {$OUT} "Failed: $url\n";
841                }
842            }
843
844            # Decrement the count.
845
846            $count--;
847        }
848
849        MCE::Hobo->wait_all();
850
851        printf {$ERR} "Mngr - exiting loop\n\n";
852        printf {$ERR} "Duration: %0.3f seconds\n\n", time - $start;
853
854        exit;
855
856        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
857        # Hobo processes enqueue two items ( $result and $failed ) per each
858        # job for the manager process. Likewise, the manager process dequeues
859        # two items above. Optionally, hobo processes may include the ID in
860        # the result.
861        #
862        # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
863
864        sub task {
865            my ( $id ) = @_;
866            printf {$ERR} "Hobo $id entering loop\n";
867
868            while ( my $job = $que->dequeue() ) {
869                my ( $result, $failed ) = ( { ID => $job->{ID} }, [ ] );
870
871                # Walk URLs, provide a hash and array refs for data.
872
873                printf {$ERR} "Hobo $id running  job $job->{ID}\n";
874                walk( $job, $result, $failed );
875
876                # Send results to the manager process.
877
878                $ret->enqueue( $result, $failed );
879            }
880
881            printf {$ERR} "Hobo $id exiting loop\n";
882        }
883
884        sub walk {
885            my ( $job, $result, $failed ) = @_;
886
887            # Yielding is critical when running an event loop in parallel.
888            # Not doing so means that the app may reach contention points
889            # with the firewall and likely impose unnecessary hardship at
890            # the OS level. The idea here is not to have multiple workers
891            # initiate HTTP requests to a batch of URLs at the same time.
892            # Yielding in 1.827+ behaves similarly like scatter to have
893            # the hobo process run solo for a fraction of time.
894
895            MCE::Hobo->yield( 0.03 );   # MCE::Hobo 1.827+
896
897            my $cv = AnyEvent->condvar();
898
899            # Populate the hash ref for the URLs it could reach.
900            # Do not mix AnyEvent timeout with hobo timeout.
901            # Therefore, choose event timeout when available.
902
903            foreach my $url ( @{ $job->{INPUT} } ) {
904                $cv->begin();
905                http_get $url, timeout => 2, sub {
906                    my ( $data, $headers ) = @_;
907                    $result->{$url} = $data;
908                    $cv->end();
909                };
910            }
911
912            $cv->recv();
913
914            # Populate the array ref for URLs it could not reach.
915
916            foreach my $url ( @{ $job->{INPUT} } ) {
917                push @{ $failed }, $url unless (exists $result->{ $url });
918            }
919
920            return;
921        }
922
923        __END__
924
925        $ perl demo.pl
926
927        Hobo 1 entering loop
928        Hobo 2 entering loop
929        Hobo 3 entering loop
930        Mngr - entering loop
931        Hobo 2 running  job 2
932        Hobo 3 running  job 3
933        Hobo 1 running  job 1
934        Hobo 4 entering loop
935        Hobo 4 running  job 4
936        Hobo 2 running  job 5
937        Mngr - received job 2
938        Hobo 3 running  job 6
939        Mngr - received job 3
940        Hobo 1 running  job 7
941        Mngr - received job 1
942        Hobo 4 running  job 8
943        Mngr - received job 4
944        http://192.168.0.1/: 3729
945        Hobo 2 exiting loop
946        Mngr - received job 5
947        Hobo 3 exiting loop
948        Mngr - received job 6
949        Hobo 1 exiting loop
950        Mngr - received job 7
951        Hobo 4 exiting loop
952        Mngr - received job 8
953        Mngr - exiting loop
954
955        Duration: 4.131 seconds
956

CROSS-PLATFORM TEMPLATE FOR BINARY EXECUTABLE

958       Making an executable is possible with the PAR::Packer module.  On the
959       Windows platform, threads, threads::shared, and exiting via threads are
960       necessary for the binary to exit successfully.
961
962        # https://metacpan.org/pod/PAR::Packer
963        # https://metacpan.org/pod/pp
964        #
965        #   pp -o demo.exe demo.pl
966        #   ./demo.exe
967
968        use strict;
969        use warnings;
970
971        use if $^O eq "MSWin32", "threads";
972        use if $^O eq "MSWin32", "threads::shared";
973
974        # Include minimum dependencies for MCE::Hobo.
975        # Add other modules required by your application here.
976
977        use Storable ();
978        use Time::HiRes ();
979
980        # use IO::FDPass ();  # optional: for condvar, handle, queue
981        # use Sereal ();      # optional: for faster serialization
982
983        use MCE::Hobo;
984        use MCE::Shared;
985
986        # For PAR to work on the Windows platform, one must include manually
987        # any shared modules used by the application.
988
989        # use MCE::Shared::Array;    # if using MCE::Shared->array
990        # use MCE::Shared::Cache;    # if using MCE::Shared->cache
991        # use MCE::Shared::Condvar;  # if using MCE::Shared->condvar
992        # use MCE::Shared::Handle;   # if using MCE::Shared->handle, mce_open
993        # use MCE::Shared::Hash;     # if using MCE::Shared->hash
994        # use MCE::Shared::Minidb;   # if using MCE::Shared->minidb
995        # use MCE::Shared::Ordhash;  # if using MCE::Shared->ordhash
996        # use MCE::Shared::Queue;    # if using MCE::Shared->queue
997        # use MCE::Shared::Scalar;   # if using MCE::Shared->scalar
998
999        # Et cetera. Only load modules needed for your application.
1000
1001        use MCE::Shared::Sequence;   # if using MCE::Shared->sequence
1002
1003        my $seq = MCE::Shared->sequence( 1, 9 );
1004
1005        sub task {
1006            my ( $id ) = @_;
1007            while ( defined ( my $num = $seq->next() ) ) {
1008                print "$id: $num\n";
1009                sleep 1;
1010            }
1011        }
1012
1013        sub main {
1014            MCE::Hobo->new( \&task, $_ ) for 1 .. 3;
1015            MCE::Hobo->wait_all();
1016        }
1017
1018        # Main must run inside a thread on the Windows platform or workers
1019        # will fail duing exiting, causing the exe to crash. The reason is
1020        # that PAR or a dependency isn't multi-process safe.
1021
1022        ( $^O eq "MSWin32" ) ? threads->create(\&main)->join() : main();
1023
1024        threads->exit(0) if $INC{"threads.pm"};
1025

CREDITS

1027       The inspiration for "MCE::Hobo" comes from wanting "threads"-like
1028       behavior for processes. Both can run side-by-side including safe-use by
1029       MCE workers.  Likewise, the documentation resembles "threads".
1030
1031       The inspiration for "wait_all" and "wait_one" comes from the
1032       "Parallel::WorkUnit" module.
1033

SEE ALSO

1035       ·  forks
1036
1037       ·  forks::BerkeleyDB
1038
1039       ·  MCE::Child
1040
1041       ·  Parallel::ForkManager
1042
1043       ·  Parallel::Loops
1044
1045       ·  Parallel::Prefork
1046
1047       ·  Parallel::WorkUnit
1048
1049       ·  Proc::Fork
1050
1051       ·  Thread::Tie
1052
1053       ·  threads
1054

INDEX

1056       MCE, MCE::Channel, MCE::Shared
1057

AUTHOR

1059       Mario E. Roy, <marioeroy AT gmail DOT com>
1060
1061
1062
1063perl v5.32.0                      2020-08-02                      MCE::Hobo(3)
Impressum